You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:56:26 UTC

[47/54] [partial] ACCUMULO-658, ACCUMULO-656 Split server into separate modules

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
new file mode 100644
index 0000000..bae614d
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class MetaDataTableScanner implements Iterator<TabletLocationState> {
+  private static final Logger log = Logger.getLogger(MetaDataTableScanner.class);
+  
+  BatchScanner mdScanner = null;
+  Iterator<Entry<Key,Value>> iter;
+  
+  public MetaDataTableScanner(Instance instance, Credentials credentials, Range range, CurrentState state) {
+    this(instance, credentials, range, state, MetadataTable.NAME);
+  }
+  
+  MetaDataTableScanner(Instance instance, Credentials credentials, Range range, CurrentState state, String tableName) {
+    // scan over metadata table, looking for tablets in the wrong state based on the live servers and online tables
+    try {
+      Connector connector = instance.getConnector(credentials.getPrincipal(), credentials.getToken());
+      mdScanner = connector.createBatchScanner(tableName, Authorizations.EMPTY, 8);
+      configureScanner(mdScanner, state);
+      mdScanner.setRanges(Collections.singletonList(range));
+      iter = mdScanner.iterator();
+    } catch (Exception ex) {
+      if (mdScanner != null)
+        mdScanner.close();
+      throw new RuntimeException(ex);
+    }
+  }
+  
+  static public void configureScanner(ScannerBase scanner, CurrentState state) {
+    TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+    scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
+    scanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME);
+    scanner.fetchColumnFamily(LogColumnFamily.NAME);
+    scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+    scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class));
+    IteratorSetting tabletChange = new IteratorSetting(1001, "tabletChange", TabletStateChangeIterator.class);
+    if (state != null) {
+      TabletStateChangeIterator.setCurrentServers(tabletChange, state.onlineTabletServers());
+      TabletStateChangeIterator.setOnlineTables(tabletChange, state.onlineTables());
+      TabletStateChangeIterator.setMerges(tabletChange, state.merges());
+    }
+    scanner.addScanIterator(tabletChange);
+  }
+  
+  public MetaDataTableScanner(Instance instance, Credentials credentials, Range range) {
+    this(instance, credentials, range, MetadataTable.NAME);
+  }
+  
+  public MetaDataTableScanner(Instance instance, Credentials credentials, Range range, String tableName) {
+    this(instance, credentials, range, null, tableName);
+  }
+  
+  public void close() {
+    if (iter != null) {
+      mdScanner.close();
+      iter = null;
+    }
+  }
+  
+  @Override
+  public void finalize() {
+    close();
+  }
+  
+  @Override
+  public boolean hasNext() {
+    if (iter == null)
+      return false;
+    boolean result = iter.hasNext();
+    if (!result) {
+      close();
+    }
+    return result;
+  }
+  
+  @Override
+  public TabletLocationState next() {
+    try {
+      return fetch();
+    } catch (RuntimeException ex) {
+      // something is wrong with the records in the !METADATA table, just skip over it
+      log.error(ex, ex);
+      mdScanner.close();
+      return null;
+    }
+  }
+  
+  public static TabletLocationState createTabletLocationState(Key k, Value v) throws IOException, BadLocationStateException {
+    final SortedMap<Key,Value> decodedRow = WholeRowIterator.decodeRow(k, v);
+    KeyExtent extent = null;
+    TServerInstance future = null;
+    TServerInstance current = null;
+    TServerInstance last = null;
+    List<Collection<String>> walogs = new ArrayList<Collection<String>>();
+    boolean chopped = false;
+    
+    for (Entry<Key,Value> entry : decodedRow.entrySet()) {
+      Key key = entry.getKey();
+      Text row = key.getRow();
+      Text cf = key.getColumnFamily();
+      Text cq = key.getColumnQualifier();
+      
+      if (cf.compareTo(TabletsSection.FutureLocationColumnFamily.NAME) == 0) {
+        TServerInstance location = new TServerInstance(entry.getValue(), cq);
+        if (future != null) {
+          throw new BadLocationStateException("found two assignments for the same extent " + key.getRow() + ": " + future + " and " + location);
+        }
+        future = location;
+      } else if (cf.compareTo(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) {
+        TServerInstance location = new TServerInstance(entry.getValue(), cq);
+        if (current != null) {
+          throw new BadLocationStateException("found two locations for the same extent " + key.getRow() + ": " + current + " and " + location);
+        }
+        current = location;
+      } else if (cf.compareTo(LogColumnFamily.NAME) == 0) {
+        String[] split = entry.getValue().toString().split("\\|")[0].split(";");
+        walogs.add(Arrays.asList(split));
+      } else if (cf.compareTo(TabletsSection.LastLocationColumnFamily.NAME) == 0) {
+        TServerInstance location = new TServerInstance(entry.getValue(), cq);
+        if (last != null) {
+          throw new BadLocationStateException("found two last locations for the same extent " + key.getRow() + ": " + last + " and " + location);
+        }
+        last = new TServerInstance(entry.getValue(), cq);
+      } else if (cf.compareTo(ChoppedColumnFamily.NAME) == 0) {
+        chopped = true;
+      } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.equals(cf, cq)) {
+        extent = new KeyExtent(row, entry.getValue());
+      }
+    }
+    if (extent == null) {
+      log.warn("No prev-row for key extent: " + decodedRow);
+      return null;
+    }
+    return new TabletLocationState(extent, future, current, last, walogs, chopped);
+  }
+  
+  private TabletLocationState fetch() {
+    try {
+      Entry<Key,Value> e = iter.next();
+      return createTabletLocationState(e.getKey(), e.getValue());
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    } catch (BadLocationStateException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+  
+  @Override
+  public void remove() {
+    throw new RuntimeException("Unimplemented");
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java
new file mode 100644
index 0000000..d8220fe
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/RootTabletStateStore.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.util.Iterator;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.server.master.state.CurrentState;
+import org.apache.accumulo.server.master.state.MetaDataTableScanner;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+
+public class RootTabletStateStore extends MetaDataStateStore {
+  
+  public RootTabletStateStore(Instance instance, Credentials credentials, CurrentState state) {
+    super(instance, credentials, state, RootTable.NAME);
+  }
+  
+  public RootTabletStateStore() {
+    super(RootTable.NAME);
+  }
+  
+  @Override
+  public Iterator<TabletLocationState> iterator() {
+    return new MetaDataTableScanner(instance, credentials, MetadataSchema.TabletsSection.getRange(), state, RootTable.NAME);
+  }
+  
+  @Override
+  public String name() {
+    return "Metadata Tablets";
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
new file mode 100644
index 0000000..4de64a8
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.io.Serializable;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.net.HostAndPort;
+
+/**
+ * A tablet is assigned to a tablet server at the given address as long as it is alive and well. When the tablet server is restarted, the instance information
+ * it advertises will change. Therefore tablet assignments can be considered out-of-date if the tablet server instance information has been changed.
+ * 
+ */
+public class TServerInstance implements Comparable<TServerInstance>, Serializable {
+  
+  private static final long serialVersionUID = 1L;
+  
+  private HostAndPort location;
+  private String session;
+  private String cachedStringRepresentation;
+  
+  public TServerInstance(HostAndPort address, String session) {
+    this.location = address;
+    this.session = session;
+    this.cachedStringRepresentation = hostPort() + "[" + session + "]";
+  }
+  
+  public TServerInstance(HostAndPort address, long session) {
+    this(address, Long.toHexString(session));
+  }
+  
+  public TServerInstance(String address, long session) {
+    this(AddressUtil.parseAddress(address), Long.toHexString(session));
+  }
+  
+  public TServerInstance(Value address, Text session) {
+    this(AddressUtil.parseAddress(new String(address.get())), session.toString());
+  }
+  
+  public void putLocation(Mutation m) {
+    m.put(TabletsSection.CurrentLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
+  }
+  
+  public void putFutureLocation(Mutation m) {
+    m.put(TabletsSection.FutureLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
+  }
+  
+  public void putLastLocation(Mutation m) {
+    m.put(TabletsSection.LastLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue());
+  }
+  
+  public void clearLastLocation(Mutation m) {
+    m.putDelete(TabletsSection.LastLocationColumnFamily.NAME, asColumnQualifier());
+  }
+  
+  @Override
+  public int compareTo(TServerInstance other) {
+    if (this == other)
+      return 0;
+    return this.toString().compareTo(other.toString());
+  }
+  
+  @Override
+  public int hashCode() {
+    return toString().hashCode();
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof TServerInstance) {
+      return compareTo((TServerInstance) obj) == 0;
+    }
+    return false;
+  }
+  
+  @Override
+  public String toString() {
+    return cachedStringRepresentation;
+  }
+  
+  public int port() {
+    return getLocation().getPort();
+  }
+  
+  public String host() {
+    return getLocation().getHostText();
+  }
+  
+  public String hostPort() {
+    return getLocation().toString();
+  }
+  
+  public Text asColumnQualifier() {
+    return new Text(this.getSession());
+  }
+  
+  public Value asMutationValue() {
+    return new Value(getLocation().toString().getBytes());
+  }
+  
+  public HostAndPort getLocation() {
+    return location;
+  }
+  
+  public String getSession() {
+    return session;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
new file mode 100644
index 0000000..bcfaead
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.KeyExtent;
+
+/**
+ * When a tablet is assigned, we mark its future location. When the tablet is opened, we set its current location. A tablet should never have both a future and
+ * current location.
+ * 
+ * A tablet server is always associated with a unique session id. If the current tablet server has a different session, we know the location information is
+ * out-of-date.
+ */
+public class TabletLocationState {
+  
+  static public class BadLocationStateException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    BadLocationStateException(String msg) { super(msg); }
+  }
+  
+  public TabletLocationState(KeyExtent extent, TServerInstance future, TServerInstance current, TServerInstance last, Collection<Collection<String>> walogs,
+      boolean chopped) throws BadLocationStateException {
+    this.extent = extent;
+    this.future = future;
+    this.current = current;
+    this.last = last;
+    if (walogs == null)
+      walogs = Collections.emptyList();
+    this.walogs = walogs;
+    this.chopped = chopped;
+    if (current != null && future != null) {
+      throw new BadLocationStateException(extent + " is both assigned and hosted, which should never happen: " + this);
+    }
+  }
+  
+  final public KeyExtent extent;
+  final public TServerInstance future;
+  final public TServerInstance current;
+  final public TServerInstance last;
+  final public Collection<Collection<String>> walogs;
+  final public boolean chopped;
+  
+  public String toString() {
+    return extent + "@(" + future + "," + current + "," + last + ")" + (chopped ? " chopped" : "");
+  }
+  
+  public TServerInstance getServer() {
+    TServerInstance result = null;
+    if (current != null) {
+      result = current;
+    } else if (future != null) {
+      result = future;
+    } else {
+      result = last;
+    }
+    return result;
+  }
+  
+  public TabletState getState(Set<TServerInstance> liveServers) {
+    TServerInstance server = getServer();
+    if (server == null)
+      return TabletState.UNASSIGNED;
+    if (server.equals(current) || server.equals(future)) {
+      if (liveServers.contains(server))
+        if (server.equals(future)) {
+          return TabletState.ASSIGNED;
+        } else {
+          return TabletState.HOSTED;
+        }
+      else {
+        return TabletState.ASSIGNED_TO_DEAD_SERVER;
+      }
+    }
+    // server == last
+    return TabletState.UNASSIGNED;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletMigration.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletMigration.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletMigration.java
new file mode 100644
index 0000000..f0a3664
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletMigration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import org.apache.accumulo.core.data.KeyExtent;
+
+public class TabletMigration {
+  public KeyExtent tablet;
+  public TServerInstance oldServer;
+  public TServerInstance newServer;
+  
+  public TabletMigration(KeyExtent extent, TServerInstance before, TServerInstance after) {
+    this.tablet = extent;
+    this.oldServer = before;
+    this.newServer = after;
+  }
+  
+  public String toString() {
+    return tablet + ": " + oldServer + " -> " + newServer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletServerState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletServerState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletServerState.java
new file mode 100644
index 0000000..23f16e3
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletServerState.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+public enum TabletServerState {
+  // not a valid state, reserved for internal use only
+  RESERVED((byte) (-1)),
+  
+  // the following are normally functioning states
+  NEW((byte) 0),
+  ONLINE((byte) 1),
+  UNRESPONSIVE((byte) 2),
+  DOWN((byte) 3),
+  
+  // the following are bad states and cause tservers to be ignored by the master
+  BAD_SYSTEM_PASSWORD((byte) 101),
+  BAD_VERSION((byte) 102),
+  BAD_INSTANCE((byte) 103),
+  BAD_CONFIG((byte) 104),
+  BAD_VERSION_AND_INSTANCE((byte) 105),
+  BAD_VERSION_AND_CONFIG((byte) 106),
+  BAD_VERSION_AND_INSTANCE_AND_CONFIG((byte) 107),
+  BAD_INSTANCE_AND_CONFIG((byte) 108);
+  
+  private byte id;
+  
+  private static HashMap<Byte,TabletServerState> mapping;
+  private static HashSet<TabletServerState> badStates;
+  
+  static {
+    mapping = new HashMap<Byte,TabletServerState>(TabletServerState.values().length);
+    badStates = new HashSet<TabletServerState>();
+    for (TabletServerState state : TabletServerState.values()) {
+      mapping.put(state.id, state);
+      if (state.id > 99)
+        badStates.add(state);
+    }
+  }
+  
+  private TabletServerState(byte id) {
+    this.id = id;
+  }
+  
+  public byte getId() {
+    return this.id;
+  }
+  
+  public static TabletServerState getStateById(byte id) {
+    if (mapping.containsKey(id))
+      return mapping.get(id);
+    throw new IndexOutOfBoundsException("No such state");
+  }
+  
+  public static Set<TabletServerState> getBadStates() {
+    return Collections.unmodifiableSet(badStates);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java
new file mode 100644
index 0000000..d69ca19
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletState.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+public enum TabletState {
+  UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
new file mode 100644
index 0000000..ddcdeea
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SkippingIterator;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+
+public class TabletStateChangeIterator extends SkippingIterator {
+  
+  private static final String SERVERS_OPTION = "servers";
+  private static final String TABLES_OPTION = "tables";
+  private static final String MERGES_OPTION = "merges";
+  // private static final Logger log = Logger.getLogger(TabletStateChangeIterator.class);
+  
+  Set<TServerInstance> current;
+  Set<String> onlineTables;
+  Map<Text,MergeInfo> merges;
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    current = parseServers(options.get(SERVERS_OPTION));
+    onlineTables = parseTables(options.get(TABLES_OPTION));
+    merges = parseMerges(options.get(MERGES_OPTION));
+  }
+  
+  private Set<String> parseTables(String tables) {
+    if (tables == null)
+      return null;
+    Set<String> result = new HashSet<String>();
+    for (String table : tables.split(","))
+      result.add(table);
+    return result;
+  }
+  
+  private Set<TServerInstance> parseServers(String servers) {
+    if (servers == null)
+      return null;
+    // parse "host:port[INSTANCE]"
+    Set<TServerInstance> result = new HashSet<TServerInstance>();
+    if (servers.length() > 0) {
+      for (String part : servers.split(",")) {
+        String parts[] = part.split("\\[", 2);
+        String hostport = parts[0];
+        String instance = parts[1];
+        if (instance != null && instance.endsWith("]"))
+          instance = instance.substring(0, instance.length() - 1);
+        result.add(new TServerInstance(AddressUtil.parseAddress(hostport), instance));
+      }
+    }
+    return result;
+  }
+  
+  private Map<Text,MergeInfo> parseMerges(String merges) {
+    if (merges == null)
+      return null;
+    try {
+      Map<Text,MergeInfo> result = new HashMap<Text,MergeInfo>();
+      DataInputBuffer buffer = new DataInputBuffer();
+      byte[] data = Base64.decodeBase64(merges.getBytes());
+      buffer.reset(data, data.length);
+      while (buffer.available() > 0) {
+        MergeInfo mergeInfo = new MergeInfo();
+        mergeInfo.readFields(buffer);
+        result.put(mergeInfo.extent.getTableId(), mergeInfo);
+      }
+      return result;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+  
+  @Override
+  protected void consume() throws IOException {
+    while (getSource().hasTop()) {
+      Key k = getSource().getTopKey();
+      Value v = getSource().getTopValue();
+      
+      if (onlineTables == null || current == null)
+        return;
+      
+      TabletLocationState tls;
+      try {
+        tls = MetaDataTableScanner.createTabletLocationState(k, v);
+        if (tls == null)
+          return;
+      } catch (BadLocationStateException e) {
+        // maybe the master can do something with a tablet with bad/inconsistent state
+        return;
+      }
+      // we always want data about merges
+      MergeInfo merge = merges.get(tls.extent.getTableId());
+      if (merge != null && merge.getExtent() != null && merge.getExtent().overlaps(tls.extent)) {
+        return;
+      }
+      // is the table supposed to be online or offline?
+      boolean shouldBeOnline = onlineTables.contains(tls.extent.getTableId().toString());
+      
+      switch (tls.getState(current)) {
+        case ASSIGNED:
+          // we always want data about assigned tablets
+          return;
+        case HOSTED:
+          if (!shouldBeOnline)
+            return;
+        case ASSIGNED_TO_DEAD_SERVER:
+          return;
+        case UNASSIGNED:
+          if (shouldBeOnline)
+            return;
+      }
+      // table is in the expected state so don't bother returning any information about it
+      getSource().next();
+    }
+  }
+  
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    throw new UnsupportedOperationException();
+  }
+  
+  public static void setCurrentServers(IteratorSetting cfg, Set<TServerInstance> goodServers) {
+    if (goodServers != null) {
+      List<String> servers = new ArrayList<String>();
+      for (TServerInstance server : goodServers)
+        servers.add(server.toString());
+      cfg.addOption(SERVERS_OPTION, StringUtil.join(servers, ","));
+    }
+  }
+  
+  public static void setOnlineTables(IteratorSetting cfg, Set<String> onlineTables) {
+    if (onlineTables != null)
+      cfg.addOption(TABLES_OPTION, StringUtil.join(onlineTables, ","));
+  }
+  
+  public static void setMerges(IteratorSetting cfg, Collection<MergeInfo> merges) {
+    DataOutputBuffer buffer = new DataOutputBuffer();
+    try {
+      for (MergeInfo info : merges) {
+        KeyExtent extent = info.getExtent();
+        if (extent != null && !info.getState().equals(MergeState.NONE)) {
+          info.write(buffer);
+        }
+      }
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    String encoded = new String(Base64.encodeBase64(Arrays.copyOf(buffer.getData(), buffer.getLength())));
+    cfg.addOption(MERGES_OPTION, encoded);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
new file mode 100644
index 0000000..5e19976
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.accumulo.server.master.state.TabletLocationState;
+
+/**
+ * Interface for storing information about tablet assignments. There are three implementations:
+ * 
+ * ZooTabletStateStore: information about the root tablet is stored in ZooKeeper MetaDataStateStore: information about the other tablets are stored in the
+ * metadata table
+ * 
+ */
+public abstract class TabletStateStore implements Iterable<TabletLocationState> {
+  
+  /**
+   * Identifying name for this tablet state store.
+   */
+  abstract public String name();
+  
+  /**
+   * Scan the information about the tablets covered by this store
+   */
+  @Override
+  abstract public Iterator<TabletLocationState> iterator();
+  
+  /**
+   * Store the assigned locations in the data store.
+   * 
+   * @param assignments
+   * @throws DistributedStoreException
+   */
+  abstract public void setFutureLocations(Collection<Assignment> assignments) throws DistributedStoreException;
+  
+  /**
+   * Tablet servers will update the data store with the location when they bring the tablet online
+   * 
+   * @param assignments
+   * @throws DistributedStoreException
+   */
+  abstract public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException;
+  
+  /**
+   * Mark the tablets as having no known or future location.
+   * 
+   * @param tablets
+   *          the tablets' current information
+   * @throws DistributedStoreException
+   */
+  abstract public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException;
+  
+  public static void unassign(TabletLocationState tls) throws DistributedStoreException {
+    TabletStateStore store;
+    if (tls.extent.isRootTablet()) {
+      store = new ZooTabletStateStore();
+    } else if (tls.extent.isMeta()) {
+      store = new RootTabletStateStore();
+    } else {
+      store = new MetaDataStateStore();
+    }
+    store.unassign(Collections.singletonList(tls));
+  }
+  
+  public static void setLocation(Assignment assignment) throws DistributedStoreException {
+    TabletStateStore store;
+    if (assignment.tablet.isRootTablet()) {
+      store = new ZooTabletStateStore();
+    } else if (assignment.tablet.isMeta()) {
+      store = new RootTabletStateStore();
+    } else {
+      store = new MetaDataStateStore();
+    }
+    store.setLocations(Collections.singletonList(assignment));
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
new file mode 100644
index 0000000..bce6681
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooStore.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.Logger;
+
+public class ZooStore implements DistributedStore {
+  
+  private static final Logger log = Logger.getLogger(ZooStore.class);
+  
+  String basePath;
+  
+  ZooCache cache = new ZooCache();
+  
+  public ZooStore(String basePath) throws IOException {
+    if (basePath.endsWith("/"))
+      basePath = basePath.substring(0, basePath.length() - 1);
+    this.basePath = basePath;
+  }
+  
+  public ZooStore() throws IOException {
+    this(ZooUtil.getRoot(HdfsZooInstance.getInstance().getInstanceID()));
+  }
+  
+  @Override
+  public byte[] get(String path) throws DistributedStoreException {
+    try {
+      return cache.get(relative(path));
+    } catch (Exception ex) {
+      throw new DistributedStoreException(ex);
+    }
+  }
+  
+  private String relative(String path) {
+    return basePath + path;
+  }
+  
+  @Override
+  public List<String> getChildren(String path) throws DistributedStoreException {
+    try {
+      return cache.getChildren(relative(path));
+    } catch (Exception ex) {
+      throw new DistributedStoreException(ex);
+    }
+  }
+  
+  @Override
+  public void put(String path, byte[] bs) throws DistributedStoreException {
+    try {
+      path = relative(path);
+      ZooReaderWriter.getInstance().putPersistentData(path, bs, NodeExistsPolicy.OVERWRITE);
+      cache.clear();
+      log.debug("Wrote " + new String(bs) + " to " + path);
+    } catch (Exception ex) {
+      throw new DistributedStoreException(ex);
+    }
+  }
+  
+  @Override
+  public void remove(String path) throws DistributedStoreException {
+    try {
+      log.debug("Removing " + path);
+      path = relative(path);
+      IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+      if (zoo.exists(path))
+        zoo.recursiveDelete(path, NodeMissingPolicy.SKIP);
+      cache.clear();
+    } catch (Exception ex) {
+      throw new DistributedStoreException(ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
new file mode 100644
index 0000000..2c5b709
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.state;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.log4j.Logger;
+
+import com.google.common.net.HostAndPort;
+
+public class ZooTabletStateStore extends TabletStateStore {
+  
+  private static final Logger log = Logger.getLogger(ZooTabletStateStore.class);
+  final private DistributedStore store;
+  
+  public ZooTabletStateStore(DistributedStore store) {
+    this.store = store;
+  }
+  
+  public ZooTabletStateStore() throws DistributedStoreException {
+    try {
+      store = new ZooStore();
+    } catch (IOException ex) {
+      throw new DistributedStoreException(ex);
+    }
+  }
+  
+  @Override
+  public Iterator<TabletLocationState> iterator() {
+    return new Iterator<TabletLocationState>() {
+      boolean finished = false;
+      
+      @Override
+      public boolean hasNext() {
+        return !finished;
+      }
+      
+      @Override
+      public TabletLocationState next() {
+        finished = true;
+        try {
+          byte[] future = store.get(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
+          byte[] current = store.get(RootTable.ZROOT_TABLET_LOCATION);
+          byte[] last = store.get(RootTable.ZROOT_TABLET_LAST_LOCATION);
+          
+          TServerInstance currentSession = null;
+          TServerInstance futureSession = null;
+          TServerInstance lastSession = null;
+          
+          if (future != null)
+            futureSession = parse(future);
+          
+          if (last != null)
+            lastSession = parse(last);
+          
+          if (current != null) {
+            currentSession = parse(current);
+            futureSession = null;
+          }
+          List<Collection<String>> logs = new ArrayList<Collection<String>>();
+          for (String entry : store.getChildren(RootTable.ZROOT_TABLET_WALOGS)) {
+            byte[] logInfo = store.get(RootTable.ZROOT_TABLET_WALOGS + "/" + entry);
+            if (logInfo != null) {
+              MetadataTableUtil.LogEntry logEntry = new MetadataTableUtil.LogEntry();
+              logEntry.fromBytes(logInfo);
+              logs.add(logEntry.logSet);
+              log.debug("root tablet logSet " + logEntry.logSet);
+            }
+          }
+          TabletLocationState result = new TabletLocationState(RootTable.EXTENT, futureSession, currentSession, lastSession, logs, false);
+          log.debug("Returning root tablet state: " + result);
+          return result;
+        } catch (Exception ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+      
+      @Override
+      public void remove() {
+        throw new NotImplementedException();
+      }
+    };
+  }
+  
+  protected TServerInstance parse(byte[] current) {
+    String str = new String(current);
+    String[] parts = str.split("[|]", 2);
+    HostAndPort address = HostAndPort.fromString(parts[0]);
+    if (parts.length > 1 && parts[1] != null && parts[1].length() > 0) {
+      return new TServerInstance(address, parts[1]);
+    } else {
+      // a 1.2 location specification: DO NOT WANT
+      return null;
+    }
+  }
+  
+  @Override
+  public void setFutureLocations(Collection<Assignment> assignments) throws DistributedStoreException {
+    if (assignments.size() != 1)
+      throw new IllegalArgumentException("There is only one root tablet");
+    Assignment assignment = assignments.iterator().next();
+    if (assignment.tablet.compareTo(RootTable.EXTENT) != 0)
+      throw new IllegalArgumentException("You can only store the root tablet location");
+    String value = assignment.server.getLocation() + "|" + assignment.server.getSession();
+    Iterator<TabletLocationState> currentIter = iterator();
+    TabletLocationState current = currentIter.next();
+    if (current.current != null) {
+      throw new DistributedStoreException("Trying to set the root tablet location: it is already set to " + current.current);
+    }
+    store.put(RootTable.ZROOT_TABLET_FUTURE_LOCATION, value.getBytes());
+  }
+  
+  @Override
+  public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException {
+    if (assignments.size() != 1)
+      throw new IllegalArgumentException("There is only one root tablet");
+    Assignment assignment = assignments.iterator().next();
+    if (assignment.tablet.compareTo(RootTable.EXTENT) != 0)
+      throw new IllegalArgumentException("You can only store the root tablet location");
+    String value = assignment.server.getLocation() + "|" + assignment.server.getSession();
+    Iterator<TabletLocationState> currentIter = iterator();
+    TabletLocationState current = currentIter.next();
+    if (current.current != null) {
+      throw new DistributedStoreException("Trying to set the root tablet location: it is already set to " + current.current);
+    }
+    if (!current.future.equals(assignment.server)) {
+      throw new DistributedStoreException("Root tablet is already assigned to " + current.future);
+    }
+    store.put(RootTable.ZROOT_TABLET_LOCATION, value.getBytes());
+    store.put(RootTable.ZROOT_TABLET_LAST_LOCATION, value.getBytes());
+    // Make the following unnecessary by making the entire update atomic
+    store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
+    log.debug("Put down root tablet location");
+  }
+  
+  @Override
+  public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException {
+    if (tablets.size() != 1)
+      throw new IllegalArgumentException("There is only one root tablet");
+    TabletLocationState tls = tablets.iterator().next();
+    if (tls.extent.compareTo(RootTable.EXTENT) != 0)
+      throw new IllegalArgumentException("You can only store the root tablet location");
+    store.remove(RootTable.ZROOT_TABLET_LOCATION);
+    store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION);
+    log.debug("unassign root tablet location");
+  }
+  
+  @Override
+  public String name() {
+    return "Root Table";
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/CompactionIterators.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/CompactionIterators.java b/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/CompactionIterators.java
new file mode 100644
index 0000000..4f5bf42
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/tableOps/CompactionIterators.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.master.tableOps;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public class CompactionIterators implements Writable {
+  byte[] startRow;
+  byte[] endRow;
+  List<IteratorSetting> iterators;
+  
+  public CompactionIterators(byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) {
+    this.startRow = startRow;
+    this.endRow = endRow;
+    this.iterators = iterators;
+  }
+  
+  public CompactionIterators() {
+    startRow = null;
+    endRow = null;
+    iterators = Collections.emptyList();
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(startRow != null);
+    if (startRow != null) {
+      out.writeInt(startRow.length);
+      out.write(startRow);
+    }
+    
+    out.writeBoolean(endRow != null);
+    if (endRow != null) {
+      out.writeInt(endRow.length);
+      out.write(endRow);
+    }
+    
+    out.writeInt(iterators.size());
+    for (IteratorSetting is : iterators) {
+      is.write(out);
+    }
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    if (in.readBoolean()) {
+      startRow = new byte[in.readInt()];
+      in.readFully(startRow);
+    } else {
+      startRow = null;
+    }
+    
+    if (in.readBoolean()) {
+      endRow = new byte[in.readInt()];
+      in.readFully(endRow);
+    } else {
+      endRow = null;
+    }
+    
+    int num = in.readInt();
+    iterators = new ArrayList<IteratorSetting>(num);
+    
+    for (int i = 0; i < num; i++) {
+      iterators.add(new IteratorSetting(in));
+    }
+  }
+  
+  public Text getEndRow() {
+    if (endRow == null)
+      return null;
+    return new Text(endRow);
+  }
+  
+  public Text getStartRow() {
+    if (startRow == null)
+      return null;
+    return new Text(startRow);
+  }
+  
+  public List<IteratorSetting> getIterators() {
+    return iterators;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
new file mode 100644
index 0000000..9735371
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.time.DateUtils;
+
+public abstract class AbstractMetricsImpl {
+  
+  public class Metric {
+    
+    private long count = 0;
+    private long avg = 0;
+    private long min = 0;
+    private long max = 0;
+    
+    public long getCount() {
+      return count;
+    }
+    
+    public long getAvg() {
+      return avg;
+    }
+    
+    public long getMin() {
+      return min;
+    }
+    
+    public long getMax() {
+      return max;
+    }
+    
+    public void incCount() {
+      count++;
+    }
+    
+    public void addAvg(long a) {
+      if (a < 0)
+        return;
+      avg = (long) ((avg * .8) + (a * .2));
+    }
+    
+    public void addMin(long a) {
+      if (a < 0)
+        return;
+      min = Math.min(min, a);
+    }
+    
+    public void addMax(long a) {
+      if (a < 0)
+        return;
+      max = Math.max(max, a);
+    }
+    
+    @Override
+    public String toString() {
+      return new ToStringBuilder(this).append("count", count).append("average", avg).append("minimum", min).append("maximum", max).toString();
+    }
+    
+  }
+  
+  static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AbstractMetricsImpl.class);
+  
+  private static ConcurrentHashMap<String,Metric> registry = new ConcurrentHashMap<String,Metric>();
+  
+  private boolean currentlyLogging = false;
+  
+  private File logDir = null;
+  
+  private String metricsPrefix = null;
+  
+  private Date today = new Date();
+  
+  private File logFile = null;
+  
+  private FileWriter logWriter = null;
+  
+  private SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");
+  
+  private SimpleDateFormat logFormatter = new SimpleDateFormat("yyyyMMddhhmmssz");
+  
+  private MetricsConfiguration config = null;
+  
+  public AbstractMetricsImpl() {
+    this.metricsPrefix = getMetricsPrefix();
+    config = new MetricsConfiguration(metricsPrefix);
+  }
+  
+  /**
+   * Registers a StandardMBean with the MBean Server
+   * 
+   * @throws Exception
+   */
+  public void register(StandardMBean mbean) throws Exception {
+    // Register this object with the MBeanServer
+    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    if (null == getObjectName())
+      throw new IllegalArgumentException("MBean object name must be set.");
+    mbs.registerMBean(mbean, getObjectName());
+    
+    setupLogging();
+  }
+  
+  /**
+   * Registers this MBean with the MBean Server
+   * 
+   * @throws Exception
+   */
+  public void register() throws Exception {
+    // Register this object with the MBeanServer
+    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    if (null == getObjectName())
+      throw new IllegalArgumentException("MBean object name must be set.");
+    mbs.registerMBean(this, getObjectName());
+    setupLogging();
+  }
+  
+  public void createMetric(String name) {
+    registry.put(name, new Metric());
+  }
+  
+  public Metric getMetric(String name) {
+    return registry.get(name);
+  }
+  
+  public long getMetricCount(String name) {
+    return registry.get(name).getCount();
+  }
+  
+  public long getMetricAvg(String name) {
+    return registry.get(name).getAvg();
+  }
+  
+  public long getMetricMin(String name) {
+    return registry.get(name).getMin();
+  }
+  
+  public long getMetricMax(String name) {
+    return registry.get(name).getMax();
+  }
+  
+  private void setupLogging() throws IOException {
+    if (null == config.getMetricsConfiguration())
+      return;
+    // If we are already logging, then return
+    if (!currentlyLogging && config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) {
+      // Check to see if directory exists, else make it
+      String mDir = config.getMetricsConfiguration().getString("logging.dir");
+      if (null != mDir) {
+        File dir = new File(mDir);
+        if (!dir.isDirectory())
+          dir.mkdir();
+        logDir = dir;
+        // Create new log file
+        startNewLog();
+      }
+      currentlyLogging = true;
+    }
+  }
+  
+  private void startNewLog() throws IOException {
+    if (null != logWriter) {
+      logWriter.flush();
+      logWriter.close();
+    }
+    logFile = new File(logDir, metricsPrefix + "-" + formatter.format(today) + ".log");
+    if (!logFile.exists()) {
+      if (!logFile.createNewFile()) {
+        log.error("Unable to create new log file");
+        currentlyLogging = false;
+        return;
+      }
+    }
+    logWriter = new FileWriter(logFile, true);
+  }
+  
+  private void writeToLog(String name) throws IOException {
+    if (null == logWriter)
+      return;
+    // Increment the date if we have to
+    Date now = new Date();
+    if (!DateUtils.isSameDay(today, now)) {
+      today = now;
+      startNewLog();
+    }
+    logWriter.append(logFormatter.format(now)).append(" Metric: ").append(name).append(": ").append(registry.get(name).toString()).append("\n");
+  }
+  
+  public void add(String name, long time) {
+    if (isEnabled()) {
+      registry.get(name).incCount();
+      registry.get(name).addAvg(time);
+      registry.get(name).addMin(time);
+      registry.get(name).addMax(time);
+      // If we are not currently logging and should be, then initialize
+      if (!currentlyLogging && config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) {
+        try {
+          setupLogging();
+        } catch (IOException ioe) {
+          log.error("Error setting up log", ioe);
+        }
+      } else if (currentlyLogging && !config.getMetricsConfiguration().getBoolean(metricsPrefix + ".logging", false)) {
+        // if we are currently logging and shouldn't be, then close logs
+        try {
+          logWriter.flush();
+          logWriter.close();
+          logWriter = null;
+          logFile = null;
+        } catch (Exception e) {
+          log.error("Error stopping metrics logging", e);
+        }
+        currentlyLogging = false;
+      }
+      if (currentlyLogging) {
+        try {
+          writeToLog(name);
+        } catch (IOException ioe) {
+          log.error("Error writing to metrics log", ioe);
+        }
+      }
+    }
+  }
+  
+  public boolean isEnabled() {
+    return config.isEnabled();
+  }
+  
+  protected abstract ObjectName getObjectName();
+  
+  protected abstract String getMetricsPrefix();
+  
+  @Override
+  protected void finalize() {
+    if (null != logWriter) {
+      try {
+        logWriter.close();
+      } catch (Exception e) {
+        // do nothing
+      } finally {
+        logWriter = null;
+      }
+    }
+    logFile = null;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsConfiguration.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsConfiguration.java
new file mode 100644
index 0000000..446a548
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsConfiguration.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+import java.io.File;
+import java.util.Iterator;
+
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.commons.configuration.AbstractFileConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.EnvironmentConfiguration;
+import org.apache.commons.configuration.SystemConfiguration;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.configuration.event.ConfigurationEvent;
+import org.apache.commons.configuration.event.ConfigurationListener;
+import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
+import org.apache.commons.lang.builder.ToStringBuilder;
+
+public class MetricsConfiguration {
+  
+  private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(MetricsConfiguration.class);
+  
+  private static final String metricsFileName = "accumulo-metrics.xml";
+  
+  private static boolean alreadyWarned = false;
+  
+  private boolean notFound = false;
+  
+  private int notFoundCount = 0;
+  
+  private static SystemConfiguration sysConfig = null;
+  
+  private static EnvironmentConfiguration envConfig = null;
+  
+  private XMLConfiguration xConfig = null;
+  
+  private Configuration config = null;
+  
+  private final Object lock = new Object();
+  
+  private boolean needsReloading = false;
+  
+  private long lastCheckTime = 0;
+  
+  private static long CONFIG_FILE_CHECK_INTERVAL = 1000 * 60 * 10; // 10 minutes
+  
+  private static int CONFIG_FILE_CHECK_COUNTER = 100;
+  
+  public final static long CONFIG_FILE_RELOAD_DELAY = 60000;
+  
+  private MetricsConfigWatcher watcher = null;
+  
+  private boolean enabled = false;
+  
+  private String enabledName = null;
+  
+  /**
+   * Background thread that pokes the XMLConfiguration file to see if it has changed. If it has, then the Configuration Listener will get an event.
+   * 
+   */
+  private class MetricsConfigWatcher extends Daemon {
+    public MetricsConfigWatcher() {}
+    
+    public void run() {
+      while (this.isAlive()) {
+        try {
+          Thread.sleep(MetricsConfiguration.CONFIG_FILE_RELOAD_DELAY);
+        } catch (InterruptedException ie) {
+          // Do Nothing
+        }
+        xConfig.getBoolean("master.enabled");
+      }
+    }
+  }
+  
+  /**
+   * ConfigurationListener that sets a flag to reload the XML config file
+   */
+  private class MetricsConfigListener implements ConfigurationListener {
+    public void configurationChanged(ConfigurationEvent event) {
+      if (event.getType() == AbstractFileConfiguration.EVENT_RELOAD)
+        needsReloading = true;
+    }
+  }
+  
+  public MetricsConfiguration(String name) {
+    // We are going to store the "enabled" parameter for this
+    // name as a shortcut so that it doesn't have to be looked
+    // up in the configuration so much.
+    this.enabledName = name + ".enabled";
+    getMetricsConfiguration();
+  }
+  
+  public Configuration getEnvironmentConfiguration() {
+    if (null == envConfig)
+      envConfig = new EnvironmentConfiguration();
+    return envConfig;
+  }
+  
+  public Configuration getSystemConfiguration() {
+    if (null == sysConfig)
+      sysConfig = new SystemConfiguration();
+    return sysConfig;
+  }
+  
+  public Configuration getMetricsConfiguration() {
+    if (notFound) {
+      if (notFoundCount <= CONFIG_FILE_CHECK_COUNTER) {
+        return null;
+      } else if ((notFoundCount > CONFIG_FILE_CHECK_COUNTER) && ((System.currentTimeMillis() - lastCheckTime) > CONFIG_FILE_CHECK_INTERVAL)) {
+        notFoundCount = 0;
+        lastCheckTime = System.currentTimeMillis();
+        notFound = false;
+      } else {
+        notFoundCount++;
+      }
+    }
+    if (null == config || needsReloading)
+      synchronized (lock) {
+        if (needsReloading) {
+          loadConfiguration();
+        } else if (null == config) {
+          loadConfiguration();
+        }
+        needsReloading = false;
+      }
+    return config;
+  }
+  
+  private void loadConfiguration() {
+    // Check to see if ACCUMULO_HOME environment variable is set.
+    String ACUHOME = getEnvironmentConfiguration().getString("ACCUMULO_CONF_DIR");
+    if (null != ACUHOME) {
+      // Try to load the metrics properties file
+      File mFile = new File(ACUHOME, metricsFileName);
+      if (mFile.exists()) {
+        if (log.isDebugEnabled())
+          log.debug("Loading config file: " + mFile.getAbsolutePath());
+        try {
+          xConfig = new XMLConfiguration(mFile);
+          xConfig.append(getEnvironmentConfiguration());
+          xConfig.addConfigurationListener(new MetricsConfigListener());
+          xConfig.setReloadingStrategy(new FileChangedReloadingStrategy());
+          
+          // Start a background Thread that checks a property from the XMLConfiguration
+          // every so often to force the FileChangedReloadingStrategy to fire.
+          if (null == watcher || !watcher.isAlive()) {
+            watcher = new MetricsConfigWatcher();
+            watcher.start();
+          }
+          notFound = false;
+          alreadyWarned = false;
+        } catch (ConfigurationException ce) {
+          log.error("Error reading accumulo-metrics.xml file.");
+          notFound = true;
+          return;
+        }
+      } else {
+        if (!alreadyWarned)
+          log.warn("Unable to find metrics file: " + mFile.getAbsolutePath());
+        alreadyWarned = true;
+        notFound = true;
+        return;
+      }
+    } else {
+      if (!alreadyWarned)
+        log.warn("ACCUMULO_CONF_DIR variable not found in environment. Metrics collection will be disabled.");
+      alreadyWarned = true;
+      notFound = true;
+      return;
+    }
+    if (xConfig != null) {
+      config = xConfig.interpolatedConfiguration();
+      // set the enabled boolean from the configuration
+      enabled = config.getBoolean(enabledName);
+      if (log.isDebugEnabled())
+        log.debug("Metrics collection enabled=" + enabled);
+    } else {
+      enabled = false;
+    }
+    
+  }
+  
+  public boolean isEnabled() {
+    // Force reload if necessary
+    if (null == getMetricsConfiguration())
+      return false;
+    return enabled;
+  }
+  
+  public static String toStringValue(Configuration config) {
+    ToStringBuilder tsb = new ToStringBuilder(MetricsConfiguration.class);
+    Iterator<?> keys = config.getKeys();
+    while (keys.hasNext()) {
+      tsb.append("\n");
+      String k = (String) keys.next();
+      Object v = config.getString(k);
+      if (null == v)
+        v = config.getList(k);
+      tsb.append(k, v.toString());
+    }
+    return tsb.toString();
+  }
+  
+  public static void main(String[] args) throws Exception {
+    MetricsConfiguration mc = new MetricsConfiguration("master");
+    while (true) {
+      // System.out.println(MetricsConfiguration.toStringValue(getSystemConfiguration()));
+      System.out.println("------------------------------------------------------------------------------------------------");
+      // System.out.println(MetricsConfiguration.toStringValue());
+      long t1 = System.currentTimeMillis();
+      System.out.println(mc.isEnabled() + " took: " + (System.currentTimeMillis() - t1));
+      Thread.sleep(1000);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java
new file mode 100644
index 0000000..cf7d3d5
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetrics.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+import javax.management.ObjectName;
+
+
+public class ThriftMetrics extends AbstractMetricsImpl implements ThriftMetricsMBean {
+  
+  static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(ThriftMetrics.class);
+  
+  private static final String METRICS_PREFIX = "thrift";
+  
+  private ObjectName OBJECT_NAME = null;
+  
+  public ThriftMetrics(String serverName, String threadName) {
+    super();
+    reset();
+    try {
+      OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=" + serverName + ",name=ThriftMetricsMBean,instance=" + threadName);
+    } catch (Exception e) {
+      log.error("Exception setting MBean object name", e);
+    }
+  }
+  
+  @Override
+  protected ObjectName getObjectName() {
+    return OBJECT_NAME;
+  }
+  
+  @Override
+  protected String getMetricsPrefix() {
+    return METRICS_PREFIX;
+  }
+  
+  public void reset() {
+    createMetric(idle);
+    createMetric(execute);
+  }
+  
+  public long getExecutionAvgTime() {
+    return this.getMetricAvg(execute);
+  }
+  
+  public long getExecutionCount() {
+    return this.getMetricCount(execute);
+  }
+  
+  public long getExecutionMaxTime() {
+    return this.getMetricMax(execute);
+  }
+  
+  public long getExecutionMinTime() {
+    return this.getMetricMin(execute);
+  }
+  
+  public long getIdleAvgTime() {
+    return this.getMetricAvg(idle);
+  }
+  
+  public long getIdleCount() {
+    return this.getMetricCount(idle);
+  }
+  
+  public long getIdleMaxTime() {
+    return this.getMetricMax(idle);
+  }
+  
+  public long getIdleMinTime() {
+    return this.getMetricMin(idle);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetricsMBean.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetricsMBean.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetricsMBean.java
new file mode 100644
index 0000000..11f94c2
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/ThriftMetricsMBean.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+public interface ThriftMetricsMBean {
+  
+  public static final String idle = "idle";
+  public static final String execute = "execute";
+  
+  public long getIdleCount();
+  
+  public long getIdleMinTime();
+  
+  public long getIdleMaxTime();
+  
+  public long getIdleAvgTime();
+  
+  public long getExecutionCount();
+  
+  public long getExecutionMinTime();
+  
+  public long getExecutionMaxTime();
+  
+  public long getExecutionAvgTime();
+  
+  public void reset();
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/monitor/DedupedLogEvent.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/monitor/DedupedLogEvent.java b/server/base/src/main/java/org/apache/accumulo/server/monitor/DedupedLogEvent.java
new file mode 100644
index 0000000..4acb1a9
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/monitor/DedupedLogEvent.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.monitor;
+
+import org.apache.log4j.spi.LoggingEvent;
+
+public class DedupedLogEvent {
+  
+  private LoggingEvent event;
+  private int count = 0;
+  private int hash = -1;
+  
+  public DedupedLogEvent(LoggingEvent event) {
+    this(event, 1);
+  }
+  
+  public DedupedLogEvent(LoggingEvent event, int count) {
+    this.event = event;
+    this.count = count;
+  }
+  
+  public LoggingEvent getEvent() {
+    return event;
+  }
+  
+  public int getCount() {
+    return count;
+  }
+  
+  public void setCount(int count) {
+    this.count = count;
+  }
+  
+  @Override
+  public int hashCode() {
+    if (hash == -1) {
+      String eventId = event.getMDC("application").toString() + ":" + event.getLevel().toString() + ":" + event.getMessage().toString();
+      hash = eventId.hashCode();
+    }
+    return hash;
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof DedupedLogEvent)
+      return this.event.equals(((DedupedLogEvent) obj).event);
+    return false;
+  }
+  
+  @Override
+  public String toString() {
+    return event.getMDC("application").toString() + ":" + event.getLevel().toString() + ":" + event.getMessage().toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java b/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java
new file mode 100644
index 0000000..fd83e97
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/monitor/LogService.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.monitor;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.log4j.net.SocketNode;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * Hijack log4j and capture log events for display.
+ * 
+ */
+public class LogService extends org.apache.log4j.AppenderSkeleton {
+
+  private static final Logger log = Logger.getLogger(LogService.class);
+
+  /**
+   * Read logging events forward to us over the net.
+   * 
+   */
+  static class SocketServer implements Runnable {
+    private ServerSocket server = null;
+
+    public SocketServer(int port) {
+      try {
+        server = new ServerSocket(port);
+      } catch (IOException io) {
+        throw new RuntimeException(io);
+      }
+    }
+
+    public int getLocalPort() {
+      return server.getLocalPort();
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (true) {
+          log.debug("Waiting for log message senders");
+          Socket socket = server.accept();
+          log.debug("Got a new connection");
+          Thread t = new Daemon(new SocketNode(socket, LogManager.getLoggerRepository()));
+          t.start();
+        }
+      } catch (IOException io) {
+        log.error(io, io);
+      }
+    }
+  }
+
+  public static void startLogListener(AccumuloConfiguration conf, String instanceId) {
+    try {
+      SocketServer server = new SocketServer(conf.getPort(Property.MONITOR_LOG4J_PORT));
+      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instanceId) + Constants.ZMONITOR_LOG4J_PORT,
+          Integer.toString(server.getLocalPort()).getBytes(), NodeExistsPolicy.OVERWRITE);
+      new Daemon(server).start();
+    } catch (Throwable t) {
+      log.info("Unable to listen to cluster-wide ports", t);
+    }
+  }
+
+  static private LogService instance = null;
+
+  synchronized public static LogService getInstance() {
+    if (instance == null)
+      return new LogService();
+    return instance;
+  }
+
+  private static final int MAX_LOGS = 50;
+
+  private LinkedHashMap<String,DedupedLogEvent> events = new LinkedHashMap<String,DedupedLogEvent>(MAX_LOGS + 1, (float) .75, true) {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    protected boolean removeEldestEntry(Map.Entry eldest) {
+      return size() > MAX_LOGS;
+    }
+  };
+
+  public LogService() {
+    synchronized (LogService.class) {
+      instance = this;
+    }
+  }
+
+  @Override
+  synchronized protected void append(LoggingEvent ev) {
+    Object application = ev.getMDC("application");
+    if (application == null || application.toString().isEmpty())
+      return;
+
+    DedupedLogEvent dev = new DedupedLogEvent(ev);
+
+    // if event is present, increase the count
+    if (events.containsKey(dev.toString())) {
+      DedupedLogEvent oldDev = events.remove(dev.toString());
+      dev.setCount(oldDev.getCount() + 1);
+    }
+    events.put(dev.toString(), dev);
+  }
+
+  @Override
+  public void close() {
+    events = null;
+  }
+
+  @Override
+  public synchronized void doAppend(LoggingEvent event) {
+    super.doAppend(event);
+  }
+
+  @Override
+  public boolean requiresLayout() {
+    return false;
+  }
+
+  synchronized public List<DedupedLogEvent> getEvents() {
+    return new ArrayList<DedupedLogEvent>(events.values());
+  }
+
+  synchronized public void clear() {
+    events.clear();
+  }
+}