You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:56:26 UTC
[47/54] [partial] ACCUMULO-658,
ACCUMULO-656 Split server into separate modules
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
new file mode 100644
index 0000000..bae614d
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class MetaDataTableScanner implements Iterator<TabletLocationState> {
+ private static final Logger log = Logger.getLogger(MetaDataTableScanner.class);
+
+ BatchScanner mdScanner = null;
+ Iterator<Entry<Key,Value>> iter;
+
+ public MetaDataTableScanner(Instance instance, Credentials credentials, Range range, CurrentState state) {
+ this(instance, credentials, range, state, MetadataTable.NAME);
+ }
+
+ MetaDataTableScanner(Instance instance, Credentials credentials, Range range, CurrentState state, String tableName) {
+ // scan over metadata table, looking for tablets in the wrong state based on the live servers and online tables
+ try {
+ Connector connector = instance.getConnector(credentials.getPrincipal(), credentials.getToken());
+ mdScanner = connector.createBatchScanner(tableName, Authorizations.EMPTY, 8);
+ configureScanner(mdScanner, state);
+ mdScanner.setRanges(Collections.singletonList(range));
+ iter = mdScanner.iterator();
+ } catch (Exception ex) {
+ if (mdScanner != null)
+ mdScanner.close();
+ throw new RuntimeException(ex);
+ }
+ }
+
+ static public void configureScanner(ScannerBase scanner, CurrentState state) {
+ TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
+ scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+ scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class));
+ IteratorSetting tabletChange = new IteratorSetting(1001, "tabletChange", TabletStateChangeIterator.class);
+ if (state != null) {
+ TabletStateChangeIterator.setCurrentServers(tabletChange, state.onlineTabletServers());
+ TabletStateChangeIterator.setOnlineTables(tabletChange, state.onlineTables());
+ TabletStateChangeIterator.setMerges(tabletChange, state.merges());
+ }
+ scanner.addScanIterator(tabletChange);
+ }
+
+ public MetaDataTableScanner(Instance instance, Credentials credentials, Range range) {
+ this(instance, credentials, range, MetadataTable.NAME);
+ }
+
+ public MetaDataTableScanner(Instance instance, Credentials credentials, Range range, String tableName) {
+ this(instance, credentials, range, null, tableName);
+ }
+
+ public void close() {
+ if (iter != null) {
+ mdScanner.close();
+ iter = null;
+ }
+ }
+
+ @Override
+ public void finalize() {
+ close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (iter == null)
+ return false;
+ boolean result = iter.hasNext();
+ if (!result) {
+ close();
+ }
+ return result;
+ }
+
+ @Override
+ public TabletLocationState next() {
+ try {
+ return fetch();
+ } catch (RuntimeException ex) {
+ // something is wrong with the records in the !METADATA table, just skip over it
+ log.error(ex, ex);
+ mdScanner.close();
+ return null;
+ }
+ }
+
+ public static TabletLocationState createTabletLocationState(Key k, Value v) throws IOException, BadLocationStateException {
+ final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, v);
+ KeyExtent extent = null;
+ TServerInstance future = null;
+ TServerInstance current = null;
+ TServerInstance last = null;
+ List<Collection<String>> walogs = new ArrayList<Collection<String>>();
+ boolean chopped = false;
+
+ for (Entry<Key,Value> entry : decodedRow.entrySet()) {
+ Key key = entry.getKey();
+ Text row = key.getRow();
+ Text cf = key.getColumnFamily();
+ Text cq = key.getColumnQualifier();
+
+ if (cf.compareTo(TabletsSection.FutureLocationColumnFamily.NAME) == 0) {
+ TServerInstance location = new TServerInstance(entry.getValue(), cq);
+ if (future != null) {
+ throw new BadLocationStateException("found two assignments for the same extent " + key.getRow() + ": " + future + " and " + location);
+ }
+ future = location;
+ } else if (cf.compareTo(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) {
+ TServerInstance location = new TServerInstance(entry.getValue(), cq);
+ if (current != null) {
+ throw new BadLocationStateException("found two locations for the same extent " + key.getRow() + ": " + current + " and " + location);
+ }
+ current = location;
+ } else if (cf.compareTo(LogColumnFamily.NAME) == 0) {
+ String[] split = entry.getValue().toString().split("\\|")[0].split(";");
+ walogs.add(Arrays.asList(split));
+ } else if (cf.compareTo(TabletsSection.LastLocationColumnFamily.NAME) == 0) {
+ TServerInstance location = new TServerInstance(entry.getValue(), cq);
+ if (last != null) {
+ throw new BadLocationStateException("found two last locations for the same extent " + key.getRow() + ": " + last + " and " + location);
+ }
+ last = new TServerInstance(entry.getValue(), cq);
+ } else if (cf.compareTo(ChoppedColumnFamily.NAME) == 0) {
+ chopped = true;
+ } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.equals(cf, cq)) {
+ extent = new KeyExtent(row, entry.getValue());
+ }
+ }
+ if (extent == null) {
+ log.warn("No prev-row for key extent: " + decodedRow);
+ return null;
+ }
+ return new TabletLocationState(extent, future, current, last, walogs, chopped);
+ }
+
+ private TabletLocationState fetch() {
+ try {
+ Entry<Key,Value> e = iter.next();
+ return createTabletLocationState(e.getKey(), e.getValue());
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ } catch (BadLocationStateException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new RuntimeException("Unimplemented");
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java
new file mode 100644
index 0000000..d8220fe
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.util.Iterator;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.server.master.state.CurrentState;
+import org.apache.accumulo.server.master.state.MetaDataTableScanner;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+
+public class RootTabletStateStore extends MetaDataStateStore {
+
+ public RootTabletStateStore(Instance instance, Credentials credentials, CurrentState state) {
+ super(instance, credentials, state, RootTable.NAME);
+ }
+
+ public RootTabletStateStore() {
+ super(RootTable.NAME);
+ }
+
+ @Override
+ public Iterator<TabletLocationState> iterator() {
+ return new MetaDataTableScanner(instance, credentials, MetadataSchema.TabletsSection.getRange(), state, RootTable.NAME);
+ }
+
+ @Override
+ public String name() {
+ return "Metadata Tablets";
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
new file mode 100644
index 0000000..4de64a8
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.io.Serializable;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.net.HostAndPort;
+
+/**
+ * A tablet is assigned to a tablet server at the given address as long as it is alive and well. When the tablet server is restarted, the instance information
+ * it advertises will change. Therefore tablet assignments can be considered out-of-date if the tablet server instance information has been changed.
+ *
+ */
+public class TServerInstance implements Comparable<TServerInstance>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private HostAndPort location;
+ private String session;
+ private String cachedStringRepresentation;
+
+ public TServerInstance(HostAndPort address, String session) {
+ this.location = address;
+ this.session = session;
+ this.cachedStringRepresentation = hostPort() + "[" + session + "]";
+ }
+
+ public TServerInstance(HostAndPort address, long session) {
+ this(address, Long.toHexString(session));
+ }
+
+ public TServerInstance(String address, long session) {
+ this(AddressUtil.parseAddress(address), Long.toHexString(session));
+ }
+
+ public TServerInstance(Value address, Text session) {
+ this(AddressUtil.parseAddress(new String(address.get())), session.toString());
+ }
+
+ public void putLocation(Mutation m) {
+ m.put(TabletsSection.CurrentLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
+ }
+
+ public void putFutureLocation(Mutation m) {
+ m.put(TabletsSection.FutureLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
+ }
+
+ public void putLastLocation(Mutation m) {
+ m.put(TabletsSection.LastLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
+ }
+
+ public void clearLastLocation(Mutation m) {
+ m.putDelete(TabletsSection.LastLocationColumnFamily.NAME, asColumnQualifier());
+ }
+
+ @Override
+ public int compareTo(TServerInstance other) {
+ if (this == other)
+ return 0;
+ return this.toString().compareTo(other.toString());
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof TServerInstance) {
+ return compareTo((TServerInstance) obj) == 0;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return cachedStringRepresentation;
+ }
+
+ public int port() {
+ return getLocation().getPort();
+ }
+
+ public String host() {
+ return getLocation().getHostText();
+ }
+
+ public String hostPort() {
+ return getLocation().toString();
+ }
+
+ public Text asColumnQualifier() {
+ return new Text(this.getSession());
+ }
+
+ public Value asMutationValue() {
+ return new Value(getLocation().toString().getBytes());
+ }
+
+ public HostAndPort getLocation() {
+ return location;
+ }
+
+ public String getSession() {
+ return session;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
new file mode 100644
index 0000000..bcfaead
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.KeyExtent;
+
+/**
+ * When a tablet is assigned, we mark its future location. When the tablet is opened, we set its current location. A tablet should never have both a future and
+ * current location.
+ *
+ * A tablet server is always associated with a unique session id. If the current tablet server has a different session, we know the location information is
+ * out-of-date.
+ */
+public class TabletLocationState {
+
+ static public class BadLocationStateException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ BadLocationStateException(String msg) { super(msg); }
+ }
+
+ public TabletLocationState(KeyExtent extent, TServerInstance future, TServerInstance current, TServerInstance last, Collection<Collection<String>> walogs,
+ boolean chopped) throws BadLocationStateException {
+ this.extent = extent;
+ this.future = future;
+ this.current = current;
+ this.last = last;
+ if (walogs == null)
+ walogs = Collections.emptyList();
+ this.walogs = walogs;
+ this.chopped = chopped;
+ if (current != null && future != null) {
+ throw new BadLocationStateException(extent + " is both assigned and hosted, which should never happen: " + this);
+ }
+ }
+
+ final public KeyExtent extent;
+ final public TServerInstance future;
+ final public TServerInstance current;
+ final public TServerInstance last;
+ final public Collection<Collection<String>> walogs;
+ final public boolean chopped;
+
+ public String toString() {
+ return extent + "@(" + future + "," + current + "," + last + ")" + (chopped ? " chopped" : "");
+ }
+
+ public TServerInstance getServer() {
+ TServerInstance result = null;
+ if (current != null) {
+ result = current;
+ } else if (future != null) {
+ result = future;
+ } else {
+ result = last;
+ }
+ return result;
+ }
+
+ public TabletState getState(Set<TServerInstance> liveServers) {
+ TServerInstance server = getServer();
+ if (server == null)
+ return TabletState.UNASSIGNED;
+ if (server.equals(current) || server.equals(future)) {
+ if (liveServers.contains(server))
+ if (server.equals(future)) {
+ return TabletState.ASSIGNED;
+ } else {
+ return TabletState.HOSTED;
+ }
+ else {
+ return TabletState.ASSIGNED_TO_DEAD_SERVER;
+ }
+ }
+ // server == last
+ return TabletState.UNASSIGNED;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletMigration.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletMigration.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletMigration.java
new file mode 100644
index 0000000..f0a3664
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletMigration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import org.apache.accumulo.core.data.KeyExtent;
+
+public class TabletMigration {
+ public KeyExtent tablet;
+ public TServerInstance oldServer;
+ public TServerInstance newServer;
+
+ public TabletMigration(KeyExtent extent, TServerInstance before, TServerInstance after) {
+ this.tablet = extent;
+ this.oldServer = before;
+ this.newServer = after;
+ }
+
+ public String toString() {
+ return tablet + ": " + oldServer + " -> " + newServer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletServerState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletServerState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletServerState.java
new file mode 100644
index 0000000..23f16e3
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletServerState.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+public enum TabletServerState {
+ // not a valid state, reserved for internal use only
+ RESERVED((byte) (-1)),
+
+ // the following are normally functioning states
+ NEW((byte) 0),
+ ONLINE((byte) 1),
+ UNRESPONSIVE((byte) 2),
+ DOWN((byte) 3),
+
+ // the following are bad states and cause tservers to be ignored by the master
+ BAD_SYSTEM_PASSWORD((byte) 101),
+ BAD_VERSION((byte) 102),
+ BAD_INSTANCE((byte) 103),
+ BAD_CONFIG((byte) 104),
+ BAD_VERSION_AND_INSTANCE((byte) 105),
+ BAD_VERSION_AND_CONFIG((byte) 106),
+ BAD_VERSION_AND_INSTANCE_AND_CONFIG((byte) 107),
+ BAD_INSTANCE_AND_CONFIG((byte) 108);
+
+ private byte id;
+
+ private static HashMap<Byte,TabletServerState> mapping;
+ private static HashSet<TabletServerState> badStates;
+
+ static {
+ mapping = new HashMap<Byte,TabletServerState>(TabletServerState.values().length);
+ badStates = new HashSet<TabletServerState>();
+ for (TabletServerState state : TabletServerState.values()) {
+ mapping.put(state.id, state);
+ if (state.id > 99)
+ badStates.add(state);
+ }
+ }
+
+ private TabletServerState(byte id) {
+ this.id = id;
+ }
+
+ public byte getId() {
+ return this.id;
+ }
+
+ public static TabletServerState getStateById(byte id) {
+ if (mapping.containsKey(id))
+ return mapping.get(id);
+ throw new IndexOutOfBoundsException("No such state");
+ }
+
+ public static Set<TabletServerState> getBadStates() {
+ return Collections.unmodifiableSet(badStates);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java
new file mode 100644
index 0000000..d69ca19
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+public enum TabletState {
+ UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
new file mode 100644
index 0000000..ddcdeea
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SkippingIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+
+public class TabletStateChangeIterator extends SkippingIterator {
+
+ private static final String SERVERS_OPTION = "servers";
+ private static final String TABLES_OPTION = "tables";
+ private static final String MERGES_OPTION = "merges";
+ // private static final Logger log = Logger.getLogger(TabletStateChangeIterator.class);
+
+ Set<TServerInstance> current;
+ Set<String> onlineTables;
+ Map<Text,MergeInfo> merges;
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ current = parseServers(options.get(SERVERS_OPTION));
+ onlineTables = parseTables(options.get(TABLES_OPTION));
+ merges = parseMerges(options.get(MERGES_OPTION));
+ }
+
+ private Set<String> parseTables(String tables) {
+ if (tables == null)
+ return null;
+ Set<String> result = new HashSet<String>();
+ for (String table : tables.split(","))
+ result.add(table);
+ return result;
+ }
+
+ private Set<TServerInstance> parseServers(String servers) {
+ if (servers == null)
+ return null;
+ // parse "host:port[INSTANCE]"
+ Set<TServerInstance> result = new HashSet<TServerInstance>();
+ if (servers.length() > 0) {
+ for (String part : servers.split(",")) {
+ String parts[] = part.split("\\[", 2);
+ String hostport = parts[0];
+ String instance = parts[1];
+ if (instance != null && instance.endsWith("]"))
+ instance = instance.substring(0, instance.length() - 1);
+ result.add(new TServerInstance(AddressUtil.parseAddress(hostport), instance));
+ }
+ }
+ return result;
+ }
+
+ private Map<Text,MergeInfo> parseMerges(String merges) {
+ if (merges == null)
+ return null;
+ try {
+ Map<Text,MergeInfo> result = new HashMap<Text,MergeInfo>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.decodeBase64(merges.getBytes());
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ MergeInfo mergeInfo = new MergeInfo();
+ mergeInfo.readFields(buffer);
+ result.put(mergeInfo.extent.getTableId(), mergeInfo);
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ protected void consume() throws IOException {
+ while (getSource().hasTop()) {
+ Key k = getSource().getTopKey();
+ Value v = getSource().getTopValue();
+
+ if (onlineTables == null || current == null)
+ return;
+
+ TabletLocationState tls;
+ try {
+ tls = MetaDataTableScanner.createTabletLocationState(k, v);
+ if (tls == null)
+ return;
+ } catch (BadLocationStateException e) {
+ // maybe the master can do something with a tablet with bad/inconsistent state
+ return;
+ }
+ // we always want data about merges
+ MergeInfo merge = merges.get(tls.extent.getTableId());
+ if (merge != null && merge.getExtent() != null && merge.getExtent().overlaps(tls.extent)) {
+ return;
+ }
+ // is the table supposed to be online or offline?
+ boolean shouldBeOnline = onlineTables.contains(tls.extent.getTableId().toString());
+
+ switch (tls.getState(current)) {
+ case ASSIGNED:
+ // we always want data about assigned tablets
+ return;
+ case HOSTED:
+ if (!shouldBeOnline)
+ return;
+ case ASSIGNED_TO_DEAD_SERVER:
+ return;
+ case UNASSIGNED:
+ if (shouldBeOnline)
+ return;
+ }
+ // table is in the expected state so don't bother returning any information about it
+ getSource().next();
+ }
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+ throw new UnsupportedOperationException();
+ }
+
+ public static void setCurrentServers(IteratorSetting cfg, Set<TServerInstance> goodServers) {
+ if (goodServers != null) {
+ List<String> servers = new ArrayList<String>();
+ for (TServerInstance server : goodServers)
+ servers.add(server.toString());
+ cfg.addOption(SERVERS_OPTION, StringUtil.join(servers, ","));
+ }
+ }
+
+ public static void setOnlineTables(IteratorSetting cfg, Set<String> onlineTables) {
+ if (onlineTables != null)
+ cfg.addOption(TABLES_OPTION, StringUtil.join(onlineTables, ","));
+ }
+
+ public static void setMerges(IteratorSetting cfg, Collection<MergeInfo> merges) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (MergeInfo info : merges) {
+ KeyExtent extent = info.getExtent();
+ if (extent != null && !info.getState().equals(MergeState.NONE)) {
+ info.write(buffer);
+ }
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded = new String(Base64.encodeBase64(Arrays.copyOf(buffer.getData(), buffer.getLength())));
+ cfg.addOption(MERGES_OPTION, encoded);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
new file mode 100644
index 0000000..5e19976
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.accumulo.server.master.state.TabletLocationState;
+
+/**
+ * Interface for storing information about tablet assignments. There are three implementations:
+ *
+ * ZooTabletStateStore: information about the root tablet is stored in ZooKeeper MetaDataStateStore: information about the other tablets are stored in the
+ * metadata table
+ *
+ */
+public abstract class TabletStateStore implements Iterable<TabletLocationState> {
+
+ /**
+ * Identifying name for this tablet state store.
+ */
+ abstract public String name();
+
+ /**
+ * Scan the information about the tablets covered by this store
+ */
+ @Override
+ abstract public Iterator<TabletLocationState> iterator();
+
+ /**
+ * Store the assigned locations in the data store.
+ *
+ * @param assignments
+ * @throws DistributedStoreException
+ */
+ abstract public void setFutureLocations(Collection<Assignment> assignments) throws DistributedStoreException;
+
+ /**
+ * Tablet servers will update the data store with the location when they bring the tablet online
+ *
+ * @param assignments
+ * @throws DistributedStoreException
+ */
+ abstract public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException;
+
+ /**
+ * Mark the tablets as having no known or future location.
+ *
+ * @param tablets
+ * the tablets' current information
+ * @throws DistributedStoreException
+ */
+ abstract public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException;
+
+ public static void unassign(TabletLocationState tls) throws DistributedStoreException {
+ TabletStateStore store;
+ if (tls.extent.isRootTablet()) {
+ store = new ZooTabletStateStore();
+ } else if (tls.extent.isMeta()) {
+ store = new RootTabletStateStore();
+ } else {
+ store = new MetaDataStateStore();
+ }
+ store.unassign(Collections.singletonList(tls));
+ }
+
+ public static void setLocation(Assignment assignment) throws DistributedStoreException {
+ TabletStateStore store;
+ if (assignment.tablet.isRootTablet()) {
+ store = new ZooTabletStateStore();
+ } else if (assignment.tablet.isMeta()) {
+ store = new RootTabletStateStore();
+ } else {
+ store = new MetaDataStateStore();
+ }
+ store.setLocations(Collections.singletonList(assignment));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
new file mode 100644
index 0000000..bce6681
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.Logger;
+
+public class ZooStore implements DistributedStore {
+
+ private static final Logger log = Logger.getLogger(ZooStore.class);
+
+ String basePath;
+
+ ZooCache cache = new ZooCache();
+
+ public ZooStore(String basePath) throws IOException {
+ if (basePath.endsWith("/"))
+ basePath = basePath.substring(0, basePath.length() - 1);
+ this.basePath = basePath;
+ }
+
+ public ZooStore() throws IOException {
+ this(ZooUtil.getRoot(HdfsZooInstance.getInstance().getInstanceID()));
+ }
+
+ @Override
+ public byte[] get(String path) throws DistributedStoreException {
+ try {
+ return cache.get(relative(path));
+ } catch (Exception ex) {
+ throw new DistributedStoreException(ex);
+ }
+ }
+
+ private String relative(String path) {
+ return basePath + path;
+ }
+
+ @Override
+ public List<String> getChildren(String path) throws DistributedStoreException {
+ try {
+ return cache.getChildren(relative(path));
+ } catch (Exception ex) {
+ throw new DistributedStoreException(ex);
+ }
+ }
+
+ @Override
+ public void put(String path, byte[] bs) throws DistributedStoreException {
+ try {
+ path = relative(path);
+ ZooReaderWriter.getInstance().putPersistentData(path, bs, NodeExistsPolicy.OVERWRITE);
+ cache.clear();
+ log.debug("Wrote " + new String(bs) + " to " + path);
+ } catch (Exception ex) {
+ throw new DistributedStoreException(ex);
+ }
+ }
+
+ @Override
+ public void remove(String path) throws DistributedStoreException {
+ try {
+ log.debug("Removing " + path);
+ path = relative(path);
+ IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+ if (zoo.exists(path))
+ zoo.recursiveDelete(path, NodeMissingPolicy.SKIP);
+ cache.clear();
+ } catch (Exception ex) {
+ throw new DistributedStoreException(ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
new file mode 100644
index 0000000..2c5b709
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.log4j.Logger;
+
+import com.google.common.net.HostAndPort;
+
+public class ZooTabletStateStore extends TabletStateStore {
+
+ private static final Logger log = Logger.getLogger(ZooTabletStateStore.class);
+ final private DistributedStore store;
+
+ public ZooTabletStateStore(DistributedStore store) {
+ this.store = store;
+ }
+
+ public ZooTabletStateStore() throws DistributedStoreException {
+ try {
+ store = new ZooStore();
+ } catch (IOException ex) {
+ throw new DistributedStoreException(ex);
+ }
+ }
+
+ @Override
+ public Iterator<TabletLocationState> iterator() {
+ return new Iterator<TabletLocationState>() {
+ boolean finished = false;
+
+ @Override
+ public boolean hasNext() {
+ return !finished;
+ }
+
+ @Override
+ public TabletLocationState next() {
+ finished = true;
+ try {
+ byte[] future = store.get(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
+ byte[] current = store.get(RootTable.ZROOT_TABLET_LOCATION);
+ byte[] last = store.get(RootTable.ZROOT_TABLET_LAST_LOCATION);
+
+ TServerInstance currentSession = null;
+ TServerInstance futureSession = null;
+ TServerInstance lastSession = null;
+
+ if (future != null)
+ futureSession = parse(future);
+
+ if (last != null)
+ lastSession = parse(last);
+
+ if (current != null) {
+ currentSession = parse(current);
+ futureSession = null;
+ }
+ List<Collection<String>> logs = new ArrayList<Collection<String>>();
+ for (String entry : store.getChildren(RootTable.ZROOT_TABLET_WALOGS)) {
+ byte[] logInfo = store.get(RootTable.ZROOT_TABLET_WALOGS + "/" + entry);
+ if (logInfo != null) {
+ MetadataTableUtil.LogEntry logEntry = new MetadataTableUtil.LogEntry();
+ logEntry.fromBytes(logInfo);
+ logs.add(logEntry.logSet);
+ log.debug("root tablet logSet " + logEntry.logSet);
+ }
+ }
+ TabletLocationState result = new TabletLocationState(RootTable.EXTENT, futureSession, currentSession, lastSession, logs, false);
+ log.debug("Returning root tablet state: " + result);
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new NotImplementedException();
+ }
+ };
+ }
+
+ protected TServerInstance parse(byte[] current) {
+ String str = new String(current);
+ String[] parts = str.split("[|]", 2);
+ HostAndPort address = HostAndPort.fromString(parts[0]);
+ if (parts.length > 1 && parts[1] != null && parts[1].length() > 0) {
+ return new TServerInstance(address, parts[1]);
+ } else {
+ // a 1.2 location specification: DO NOT WANT
+ return null;
+ }
+ }
+
+ @Override
+ public void setFutureLocations(Collection<Assignment> assignments) throws DistributedStoreException {
+ if (assignments.size() != 1)
+ throw new IllegalArgumentException("There is only one root tablet");
+ Assignment assignment = assignments.iterator().next();
+ if (assignment.tablet.compareTo(RootTable.EXTENT) != 0)
+ throw new IllegalArgumentException("You can only store the root tablet location");
+ String value = assignment.server.getLocation() + "|" + assignment.server.getSession();
+ Iterator<TabletLocationState> currentIter = iterator();
+ TabletLocationState current = currentIter.next();
+ if (current.current != null) {
+ throw new DistributedStoreException("Trying to set the root tablet location: it is already set to " + current.current);
+ }
+ store.put(RootTable.ZROOT_TABLET_FUTURE_LOCATION, value.getBytes());
+ }
+
+ @Override
+ public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException {
+ if (assignments.size() != 1)
+ throw new IllegalArgumentException("There is only one root tablet");
+ Assignment assignment = assignments.iterator().next();
+ if (assignment.tablet.compareTo(RootTable.EXTENT) != 0)
+ throw new IllegalArgumentException("You can only store the root tablet location");
+ String value = assignment.server.getLocation() + "|" + assignment.server.getSession();
+ Iterator<TabletLocationState> currentIter = iterator();
+ TabletLocationState current = currentIter.next();
+ if (current.current != null) {
+ throw new DistributedStoreException("Trying to set the root tablet location: it is already set to " + current.current);
+ }
+ if (!current.future.equals(assignment.server)) {
+ throw new DistributedStoreException("Root tablet is already assigned to " + current.future);
+ }
+ store.put(RootTable.ZROOT_TABLET_LOCATION, value.getBytes());
+ store.put(RootTable.ZROOT_TABLET_LAST_LOCATION, value.getBytes());
+ // Make the following unnecessary by making the entire update atomic
+ store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
+ log.debug("Put down root tablet location");
+ }
+
+ @Override
+ public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException {
+ if (tablets.size() != 1)
+ throw new IllegalArgumentException("There is only one root tablet");
+ TabletLocationState tls = tablets.iterator().next();
+ if (tls.extent.compareTo(RootTable.EXTENT) != 0)
+ throw new IllegalArgumentException("You can only store the root tablet location");
+ store.remove(RootTable.ZROOT_TABLET_LOCATION);
+ store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
+ log.debug("unassign root tablet location");
+ }
+
+ @Override
+ public String name() {
+ return "Root Table";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/CompactionIterators.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/CompactionIterators.java b/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/CompactionIterators.java
new file mode 100644
index 0000000..4f5bf42
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/CompactionIterators.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.tableOps;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public class CompactionIterators implements Writable {
+ byte[] startRow;
+ byte[] endRow;
+ List<IteratorSetting> iterators;
+
+ public CompactionIterators(byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) {
+ this.startRow = startRow;
+ this.endRow = endRow;
+ this.iterators = iterators;
+ }
+
+ public CompactionIterators() {
+ startRow = null;
+ endRow = null;
+ iterators = Collections.emptyList();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(startRow != null);
+ if (startRow != null) {
+ out.writeInt(startRow.length);
+ out.write(startRow);
+ }
+
+ out.writeBoolean(endRow != null);
+ if (endRow != null) {
+ out.writeInt(endRow.length);
+ out.write(endRow);
+ }
+
+ out.writeInt(iterators.size());
+ for (IteratorSetting is : iterators) {
+ is.write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ if (in.readBoolean()) {
+ startRow = new byte[in.readInt()];
+ in.readFully(startRow);
+ } else {
+ startRow = null;
+ }
+
+ if (in.readBoolean()) {
+ endRow = new byte[in.readInt()];
+ in.readFully(endRow);
+ } else {
+ endRow = null;
+ }
+
+ int num = in.readInt();
+ iterators = new ArrayList<IteratorSetting>(num);
+
+ for (int i = 0; i < num; i++) {
+ iterators.add(new IteratorSetting(in));
+ }
+ }
+
+ public Text getEndRow() {
+ if (endRow == null)
+ return null;
+ return new Text(endRow);
+ }
+
+ public Text getStartRow() {
+ if (startRow == null)
+ return null;
+ return new Text(startRow);
+ }
+
+ public List<IteratorSetting> getIterators() {
+ return iterators;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
new file mode 100644
index 0000000..9735371
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.time.DateUtils;
+
+public abstract class AbstractMetricsImpl {
+
+ public class Metric {
+
+ private long count = 0;
+ private long avg = 0;
+ private long min = 0;
+ private long max = 0;
+
+ public long getCount() {
+ return count;
+ }
+
+ public long getAvg() {
+ return avg;
+ }
+
+ public long getMin() {
+ return min;
+ }
+
+ public long getMax() {
+ return max;
+ }
+
+ public void incCount() {
+ count++;
+ }
+
+ public void addAvg(long a) {
+ if (a < 0)
+ return;
+ avg = (long) ((avg * .8) + (a * .2));
+ }
+
+ public void addMin(long a) {
+ if (a < 0)
+ return;
+ min = Math.min(min, a);
+ }
+
+ public void addMax(long a) {
+ if (a < 0)
+ return;
+ max = Math.max(max, a);
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this).append("count", count).append("average", avg).append("minimum", min).append("maximum", max).toString();
+ }
+
+ }
+
+ static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AbstractMetricsImpl.class);
+
+ private static ConcurrentHashMap<String,Metric> registry = new ConcurrentHashMap<String,Metric>();
+
+ private boolean currentlyLogging = false;
+
+ private File logDir = null;
+
+ private String metricsPrefix = null;
+
+ private Date today = new Date();
+
+ private File logFile = null;
+
+ private FileWriter logWriter = null;
+
+ private SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");
+
+ private SimpleDateFormat logFormatter = new SimpleDateFormat("yyyyMMddhhmmssz");
+
+ private MetricsConfiguration config = null;
+
+ public AbstractMetricsImpl() {
+ this.metricsPrefix = getMetricsPrefix();
+ config = new MetricsConfiguration(metricsPrefix);
+ }
+
+ /**
+ * Registers a StandardMBean with the MBean Server
+ *
+ * @throws Exception
+ */
+ public void register(StandardMBean mbean) throws Exception {
+ // Register this object with the MBeanServer
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ if (null == getObjectName())
+ throw new IllegalArgumentException("MBean object name must be set.");
+ mbs.registerMBean(mbean, getObjectName());
+
+ setupLogging();
+ }
+
+ /**
+ * Registers this MBean with the MBean Server
+ *
+ * @throws Exception
+ */
+ public void register() throws Exception {
+ // Register this object with the MBeanServer
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ if (null == getObjectName())
+ throw new IllegalArgumentException("MBean object name must be set.");
+ mbs.registerMBean(this, getObjectName());
+ setupLogging();
+ }
+
+ public void createMetric(String name) {
+ registry.put(name, new Metric());
+ }
+
+ public Metric getMetric(String name) {
+ return registry.get(name);
+ }
+
+ public long getMetricCount(String name) {
+ return registry.get(name).getCount();
+ }
+
+ public long getMetricAvg(String name) {
+ return registry.get(name).getAvg();
+ }
+
+ public long getMetricMin(String name) {
+ return registry.get(name).getMin();
+ }
+
+ public long getMetricMax(String name) {
+ return registry.get(name).getMax();
+ }
+
+ private void setupLogging() throws IOException {
+ if (null == config.getMetricsConfiguration())
+ return;
+ // If we are already logging, then return
+ if (!currentlyLogging && config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) {
+ // Check to see if directory exists, else make it
+ String mDir = config.getMetricsConfiguration().getString("logging.dir");
+ if (null != mDir) {
+ File dir = new File(mDir);
+ if (!dir.isDirectory())
+ dir.mkdir();
+ logDir = dir;
+ // Create new log file
+ startNewLog();
+ }
+ currentlyLogging = true;
+ }
+ }
+
+ private void startNewLog() throws IOException {
+ if (null != logWriter) {
+ logWriter.flush();
+ logWriter.close();
+ }
+ logFile = new File(logDir, metricsPrefix + "-" + formatter.format(today) + ".log");
+ if (!logFile.exists()) {
+ if (!logFile.createNewFile()) {
+ log.error("Unable to create new log file");
+ currentlyLogging = false;
+ return;
+ }
+ }
+ logWriter = new FileWriter(logFile, true);
+ }
+
+ private void writeToLog(String name) throws IOException {
+ if (null == logWriter)
+ return;
+ // Increment the date if we have to
+ Date now = new Date();
+ if (!DateUtils.isSameDay(today, now)) {
+ today = now;
+ startNewLog();
+ }
+ logWriter.append(logFormatter.format(now)).append(" Metric: ").append(name).append(": ").append(registry.get(name).toString()).append("\n");
+ }
+
+ public void add(String name, long time) {
+ if (isEnabled()) {
+ registry.get(name).incCount();
+ registry.get(name).addAvg(time);
+ registry.get(name).addMin(time);
+ registry.get(name).addMax(time);
+ // If we are not currently logging and should be, then initialize
+ if (!currentlyLogging && config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) {
+ try {
+ setupLogging();
+ } catch (IOException ioe) {
+ log.error("Error setting up log", ioe);
+ }
+ } else if (currentlyLogging && !config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) {
+ // if we are currently logging and shouldn't be, then close logs
+ try {
+ logWriter.flush();
+ logWriter.close();
+ logWriter = null;
+ logFile = null;
+ } catch (Exception e) {
+ log.error("Error stopping metrics logging", e);
+ }
+ currentlyLogging = false;
+ }
+ if (currentlyLogging) {
+ try {
+ writeToLog(name);
+ } catch (IOException ioe) {
+ log.error("Error writing to metrics log", ioe);
+ }
+ }
+ }
+ }
+
+ public boolean isEnabled() {
+ return config.isEnabled();
+ }
+
+ protected abstract ObjectName getObjectName();
+
+ protected abstract String getMetricsPrefix();
+
+ @Override
+ protected void finalize() {
+ if (null != logWriter) {
+ try {
+ logWriter.close();
+ } catch (Exception e) {
+ // do nothing
+ } finally {
+ logWriter = null;
+ }
+ }
+ logFile = null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsConfiguration.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsConfiguration.java
new file mode 100644
index 0000000..446a548
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsConfiguration.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+import java.io.File;
+import java.util.Iterator;
+
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.commons.configuration.AbstractFileConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.EnvironmentConfiguration;
+import org.apache.commons.configuration.SystemConfiguration;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.configuration.event.ConfigurationEvent;
+import org.apache.commons.configuration.event.ConfigurationListener;
+import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
+import org.apache.commons.lang.builder.ToStringBuilder;
+
+public class MetricsConfiguration {
+
+ private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(MetricsConfiguration.class);
+
+ private static final String metricsFileName = "accumulo-metrics.xml";
+
+ private static boolean alreadyWarned = false;
+
+ private boolean notFound = false;
+
+ private int notFoundCount = 0;
+
+ private static SystemConfiguration sysConfig = null;
+
+ private static EnvironmentConfiguration envConfig = null;
+
+ private XMLConfiguration xConfig = null;
+
+ private Configuration config = null;
+
+ private final Object lock = new Object();
+
+ private boolean needsReloading = false;
+
+ private long lastCheckTime = 0;
+
+ private static long CONFIG_FILE_CHECK_INTERVAL = 1000 * 60 * 10; // 10 minutes
+
+ private static int CONFIG_FILE_CHECK_COUNTER = 100;
+
+ public final static long CONFIG_FILE_RELOAD_DELAY = 60000;
+
+ private MetricsConfigWatcher watcher = null;
+
+ private boolean enabled = false;
+
+ private String enabledName = null;
+
+ /**
+ * Background thread that pokes the XMLConfiguration file to see if it has changed. If it has, then the Configuration Listener will get an event.
+ *
+ */
+ private class MetricsConfigWatcher extends Daemon {
+ public MetricsConfigWatcher() {}
+
+ public void run() {
+ while (this.isAlive()) {
+ try {
+ Thread.sleep(MetricsConfiguration.CONFIG_FILE_RELOAD_DELAY);
+ } catch (InterruptedException ie) {
+ // Do Nothing
+ }
+ xConfig.getBoolean("master.enabled");
+ }
+ }
+ }
+
+ /**
+ * ConfigurationListener that sets a flag to reload the XML config file
+ */
+ private class MetricsConfigListener implements ConfigurationListener {
+ public void configurationChanged(ConfigurationEvent event) {
+ if (event.getType() == AbstractFileConfiguration.EVENT_RELOAD)
+ needsReloading = true;
+ }
+ }
+
+ public MetricsConfiguration(String name) {
+ // We are going to store the "enabled" parameter for this
+ // name as a shortcut so that it doesn't have to be looked
+ // up in the configuration so much.
+ this.enabledName = name + ".enabled";
+ getMetricsConfiguration();
+ }
+
+ public Configuration getEnvironmentConfiguration() {
+ if (null == envConfig)
+ envConfig = new EnvironmentConfiguration();
+ return envConfig;
+ }
+
+ public Configuration getSystemConfiguration() {
+ if (null == sysConfig)
+ sysConfig = new SystemConfiguration();
+ return sysConfig;
+ }
+
+ public Configuration getMetricsConfiguration() {
+ if (notFound) {
+ if (notFoundCount <= CONFIG_FILE_CHECK_COUNTER) {
+ return null;
+ } else if ((notFoundCount > CONFIG_FILE_CHECK_COUNTER) && ((System.currentTimeMillis() - lastCheckTime) > CONFIG_FILE_CHECK_INTERVAL)) {
+ notFoundCount = 0;
+ lastCheckTime = System.currentTimeMillis();
+ notFound = false;
+ } else {
+ notFoundCount++;
+ }
+ }
+ if (null == config || needsReloading)
+ synchronized (lock) {
+ if (needsReloading) {
+ loadConfiguration();
+ } else if (null == config) {
+ loadConfiguration();
+ }
+ needsReloading = false;
+ }
+ return config;
+ }
+
+ private void loadConfiguration() {
+ // Check to see if ACCUMULO_HOME environment variable is set.
+ String ACUHOME = getEnvironmentConfiguration().getString("ACCUMULO_CONF_DIR");
+ if (null != ACUHOME) {
+ // Try to load the metrics properties file
+ File mFile = new File(ACUHOME, metricsFileName);
+ if (mFile.exists()) {
+ if (log.isDebugEnabled())
+ log.debug("Loading config file: " + mFile.getAbsolutePath());
+ try {
+ xConfig = new XMLConfiguration(mFile);
+ xConfig.append(getEnvironmentConfiguration());
+ xConfig.addConfigurationListener(new MetricsConfigListener());
+ xConfig.setReloadingStrategy(new FileChangedReloadingStrategy());
+
+ // Start a background Thread that checks a property from the XMLConfiguration
+ // every so often to force the FileChangedReloadingStrategy to fire.
+ if (null == watcher || !watcher.isAlive()) {
+ watcher = new MetricsConfigWatcher();
+ watcher.start();
+ }
+ notFound = false;
+ alreadyWarned = false;
+ } catch (ConfigurationException ce) {
+ log.error("Error reading accumulo-metrics.xml file.");
+ notFound = true;
+ return;
+ }
+ } else {
+ if (!alreadyWarned)
+ log.warn("Unable to find metrics file: " + mFile.getAbsolutePath());
+ alreadyWarned = true;
+ notFound = true;
+ return;
+ }
+ } else {
+ if (!alreadyWarned)
+ log.warn("ACCUMULO_CONF_DIR variable not found in environment. Metrics collection will be disabled.");
+ alreadyWarned = true;
+ notFound = true;
+ return;
+ }
+ if (xConfig != null) {
+ config = xConfig.interpolatedConfiguration();
+ // set the enabled boolean from the configuration
+ enabled = config.getBoolean(enabledName);
+ if (log.isDebugEnabled())
+ log.debug("Metrics collection enabled=" + enabled);
+ } else {
+ enabled = false;
+ }
+
+ }
+
+ public boolean isEnabled() {
+ // Force reload if necessary
+ if (null == getMetricsConfiguration())
+ return false;
+ return enabled;
+ }
+
+ public static String toStringValue(Configuration config) {
+ ToStringBuilder tsb = new ToStringBuilder(MetricsConfiguration.class);
+ Iterator<?> keys = config.getKeys();
+ while (keys.hasNext()) {
+ tsb.append("\n");
+ String k = (String) keys.next();
+ Object v = config.getString(k);
+ if (null == v)
+ v = config.getList(k);
+ tsb.append(k, v.toString());
+ }
+ return tsb.toString();
+ }
+
+ public static void main(String[] args) throws Exception {
+ MetricsConfiguration mc = new MetricsConfiguration("master");
+ while (true) {
+ // System.out.println(MetricsConfiguration.toStringValue(getSystemConfiguration()));
+ System.out.println("------------------------------------------------------------------------------------------------");
+ // System.out.println(MetricsConfiguration.toStringValue());
+ long t1 = System.currentTimeMillis();
+ System.out.println(mc.isEnabled() + " took: " + (System.currentTimeMillis() - t1));
+ Thread.sleep(1000);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java
new file mode 100644
index 0000000..cf7d3d5
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+import javax.management.ObjectName;
+
+
+public class ThriftMetrics extends AbstractMetricsImpl implements ThriftMetricsMBean {
+
+ static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(ThriftMetrics.class);
+
+ private static final String METRICS_PREFIX = "thrift";
+
+ private ObjectName OBJECT_NAME = null;
+
+ public ThriftMetrics(String serverName, String threadName) {
+ super();
+ reset();
+ try {
+ OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=" + serverName + ",name=ThriftMetricsMBean,instance=" + threadName);
+ } catch (Exception e) {
+ log.error("Exception setting MBean object name", e);
+ }
+ }
+
+ @Override
+ protected ObjectName getObjectName() {
+ return OBJECT_NAME;
+ }
+
+ @Override
+ protected String getMetricsPrefix() {
+ return METRICS_PREFIX;
+ }
+
+ public void reset() {
+ createMetric(idle);
+ createMetric(execute);
+ }
+
+ public long getExecutionAvgTime() {
+ return this.getMetricAvg(execute);
+ }
+
+ public long getExecutionCount() {
+ return this.getMetricCount(execute);
+ }
+
+ public long getExecutionMaxTime() {
+ return this.getMetricMax(execute);
+ }
+
+ public long getExecutionMinTime() {
+ return this.getMetricMin(execute);
+ }
+
+ public long getIdleAvgTime() {
+ return this.getMetricAvg(idle);
+ }
+
+ public long getIdleCount() {
+ return this.getMetricCount(idle);
+ }
+
+ public long getIdleMaxTime() {
+ return this.getMetricMax(idle);
+ }
+
+ public long getIdleMinTime() {
+ return this.getMetricMin(idle);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetricsMBean.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetricsMBean.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetricsMBean.java
new file mode 100644
index 0000000..11f94c2
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetricsMBean.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+public interface ThriftMetricsMBean {
+
+ public static final String idle = "idle";
+ public static final String execute = "execute";
+
+ public long getIdleCount();
+
+ public long getIdleMinTime();
+
+ public long getIdleMaxTime();
+
+ public long getIdleAvgTime();
+
+ public long getExecutionCount();
+
+ public long getExecutionMinTime();
+
+ public long getExecutionMaxTime();
+
+ public long getExecutionAvgTime();
+
+ public void reset();
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/monitor/DedupedLogEvent.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/monitor/DedupedLogEvent.java b/server/base/src/main/java/org/apache/accumulo/server/monitor/DedupedLogEvent.java
new file mode 100644
index 0000000..4acb1a9
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/monitor/DedupedLogEvent.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.monitor;
+
+import org.apache.log4j.spi.LoggingEvent;
+
+public class DedupedLogEvent {
+
+ private LoggingEvent event;
+ private int count = 0;
+ private int hash = -1;
+
+ public DedupedLogEvent(LoggingEvent event) {
+ this(event, 1);
+ }
+
+ public DedupedLogEvent(LoggingEvent event, int count) {
+ this.event = event;
+ this.count = count;
+ }
+
+ public LoggingEvent getEvent() {
+ return event;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public void setCount(int count) {
+ this.count = count;
+ }
+
+ @Override
+ public int hashCode() {
+ if (hash == -1) {
+ String eventId = event.getMDC("application").toString() + ":" + event.getLevel().toString() + ":" + event.getMessage().toString();
+ hash = eventId.hashCode();
+ }
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof DedupedLogEvent)
+ return this.event.equals(((DedupedLogEvent) obj).event);
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return event.getMDC("application").toString() + ":" + event.getLevel().toString() + ":" + event.getMessage().toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java b/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java
new file mode 100644
index 0000000..fd83e97
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.monitor;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.log4j.net.SocketNode;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * Hijack log4j and capture log events for display.
+ *
+ */
+public class LogService extends org.apache.log4j.AppenderSkeleton {
+
+ private static final Logger log = Logger.getLogger(LogService.class);
+
+ /**
+ * Read logging events forward to us over the net.
+ *
+ */
+ static class SocketServer implements Runnable {
+ private ServerSocket server = null;
+
+ public SocketServer(int port) {
+ try {
+ server = new ServerSocket(port);
+ } catch (IOException io) {
+ throw new RuntimeException(io);
+ }
+ }
+
+ public int getLocalPort() {
+ return server.getLocalPort();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ log.debug("Waiting for log message senders");
+ Socket socket = server.accept();
+ log.debug("Got a new connection");
+ Thread t = new Daemon(new SocketNode(socket, LogManager.getLoggerRepository()));
+ t.start();
+ }
+ } catch (IOException io) {
+ log.error(io, io);
+ }
+ }
+ }
+
+ public static void startLogListener(AccumuloConfiguration conf, String instanceId) {
+ try {
+ SocketServer server = new SocketServer(conf.getPort(Property.MONITOR_LOG4J_PORT));
+ ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instanceId) + Constants.ZMONITOR_LOG4J_PORT,
+ Integer.toString(server.getLocalPort()).getBytes(), NodeExistsPolicy.OVERWRITE);
+ new Daemon(server).start();
+ } catch (Throwable t) {
+ log.info("Unable to listen to cluster-wide ports", t);
+ }
+ }
+
+ static private LogService instance = null;
+
+ synchronized public static LogService getInstance() {
+ if (instance == null)
+ return new LogService();
+ return instance;
+ }
+
+ private static final int MAX_LOGS = 50;
+
+ private LinkedHashMap<String,DedupedLogEvent> events = new LinkedHashMap<String,DedupedLogEvent>(MAX_LOGS + 1, (float) .75, true) {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ return size() > MAX_LOGS;
+ }
+ };
+
+ public LogService() {
+ synchronized (LogService.class) {
+ instance = this;
+ }
+ }
+
+ @Override
+ synchronized protected void append(LoggingEvent ev) {
+ Object application = ev.getMDC("application");
+ if (application == null || application.toString().isEmpty())
+ return;
+
+ DedupedLogEvent dev = new DedupedLogEvent(ev);
+
+ // if event is present, increase the count
+ if (events.containsKey(dev.toString())) {
+ DedupedLogEvent oldDev = events.remove(dev.toString());
+ dev.setCount(oldDev.getCount() + 1);
+ }
+ events.put(dev.toString(), dev);
+ }
+
+ @Override
+ public void close() {
+ events = null;
+ }
+
+ @Override
+ public synchronized void doAppend(LoggingEvent event) {
+ super.doAppend(event);
+ }
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+
+ synchronized public List<DedupedLogEvent> getEvents() {
+ return new ArrayList<DedupedLogEvent>(events.values());
+ }
+
+ synchronized public void clear() {
+ events.clear();
+ }
+}