You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2013/11/01 01:56:28 UTC

[49/54] [partial] ACCUMULO-658, ACCUMULO-656 Split server into separate modules

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
new file mode 100644
index 0000000..18381c7
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ZooConfiguration.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance.AccumuloNotInitializedException;
+import org.apache.log4j.Logger;
+
+public class ZooConfiguration extends AccumuloConfiguration {
+  private static final Logger log = Logger.getLogger(ZooConfiguration.class);
+  
+  private final AccumuloConfiguration parent;
+  private static ZooConfiguration instance = null;
+  private static String instanceId = null;
+  private static ZooCache propCache = null;
+  private final Map<String,String> fixedProps = Collections.synchronizedMap(new HashMap<String,String>());
+  
+  private ZooConfiguration(AccumuloConfiguration parent) {
+    this.parent = parent;
+  }
+  
+  synchronized public static ZooConfiguration getInstance(Instance inst, AccumuloConfiguration parent) {
+    if (instance == null) {
+      propCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
+      instance = new ZooConfiguration(parent);
+      instanceId = inst.getInstanceID();
+    }
+    return instance;
+  }
+  
+  synchronized public static ZooConfiguration getInstance(AccumuloConfiguration parent) {
+    if (instance == null) {
+      propCache = new ZooCache(parent.get(Property.INSTANCE_ZK_HOST), (int) parent.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT));
+      instance = new ZooConfiguration(parent);
+      String deprecatedInstanceIdFromHdfs = ZooUtil.getInstanceIDFromHdfs(ServerConstants.getInstanceIdLocation());
+      instanceId = deprecatedInstanceIdFromHdfs;
+    }
+    return instance;
+  }
+  
+  @Override
+  public void invalidateCache() {
+    if (propCache != null)
+      propCache.clear();
+  }
+  
+  private String _get(Property property) {
+    String key = property.getKey();
+    String value = null;
+    
+    if (Property.isValidZooPropertyKey(key)) {
+      try {
+        value = get(key);
+      } catch (AccumuloNotInitializedException e) {
+        log.warn("failed to lookup property in zookeeper: " + key, e);
+      }
+    }
+    
+    if (value == null || !property.getType().isValidFormat(value)) {
+      if (value != null)
+        log.error("Using parent value for " + key + " due to improperly formatted " + property.getType() + ": " + value);
+      value = parent.get(property);
+    }
+    return value;
+  }
+  
+  @Override
+  public String get(Property property) {
+    if (Property.isFixedZooPropertyKey(property)) {
+      if (fixedProps.containsKey(property.getKey())) {
+        return fixedProps.get(property.getKey());
+      } else {
+        synchronized (fixedProps) {
+          String val = _get(property);
+          fixedProps.put(property.getKey(), val);
+          return val;
+        }
+        
+      }
+    } else {
+      return _get(property);
+    }
+  }
+  
+  private String get(String key) {
+    String zPath = ZooUtil.getRoot(instanceId) + Constants.ZCONFIG + "/" + key;
+    byte[] v = propCache.get(zPath);
+    String value = null;
+    if (v != null)
+      value = new String(v);
+    return value;
+  }
+  
+  @Override
+  public Iterator<Entry<String,String>> iterator() {
+    TreeMap<String,String> entries = new TreeMap<String,String>();
+    
+    for (Entry<String,String> parentEntry : parent)
+      entries.put(parentEntry.getKey(), parentEntry.getValue());
+    
+    List<String> children = propCache.getChildren(ZooUtil.getRoot(instanceId) + Constants.ZCONFIG);
+    if (children != null) {
+      for (String child : children) {
+        String value = get(child);
+        if (child != null && value != null)
+          entries.put(child, value);
+      }
+    }
+    
+    /*
+     * //this code breaks the shells ability to show updates just made //the code is probably not needed as fixed props are only obtained through get
+     * 
+     * for(Property prop : Property.getFixedProperties()) get(prop);
+     * 
+     * for(Entry<String, String> fprop : fixedProps.entrySet()) entries.put(fprop.getKey(), fprop.getValue());
+     */
+    
+    return entries.entrySet().iterator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
new file mode 100644
index 0000000..ce5e5e4
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.constraints;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.constraints.Constraint;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.server.zookeeper.ZooLock;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+public class MetadataConstraints implements Constraint {
+  
+  private ZooCache zooCache = null;
+  private String zooRoot = null;
+  
+  private static final Logger log = Logger.getLogger(MetadataConstraints.class);
+  
+  private static boolean[] validTableNameChars = new boolean[256];
+  
+  {
+    for (int i = 0; i < 256; i++) {
+      validTableNameChars[i] = ((i >= 'a' && i <= 'z') || (i >= '0' && i <= '9')) || i == '!';
+    }
+  }
+  
+  private static final HashSet<ColumnFQ> validColumnQuals = new HashSet<ColumnFQ>(Arrays.asList(new ColumnFQ[] {
+      TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN, TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN,
+      TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN, TabletsSection.ServerColumnFamily.TIME_COLUMN,
+      TabletsSection.ServerColumnFamily.LOCK_COLUMN, TabletsSection.ServerColumnFamily.FLUSH_COLUMN, TabletsSection.ServerColumnFamily.COMPACT_COLUMN}));
+  
+  private static final HashSet<Text> validColumnFams = new HashSet<Text>(Arrays.asList(new Text[] {TabletsSection.BulkFileColumnFamily.NAME,
+      LogColumnFamily.NAME, ScanFileColumnFamily.NAME, DataFileColumnFamily.NAME,
+      TabletsSection.CurrentLocationColumnFamily.NAME, TabletsSection.LastLocationColumnFamily.NAME, TabletsSection.FutureLocationColumnFamily.NAME,
+      ChoppedColumnFamily.NAME, ClonedColumnFamily.NAME}));
+  
+  private static boolean isValidColumn(ColumnUpdate cu) {
+    
+    if (validColumnFams.contains(new Text(cu.getColumnFamily())))
+      return true;
+    
+    if (validColumnQuals.contains(new ColumnFQ(cu)))
+      return true;
+    
+    return false;
+  }
+  
+  static private ArrayList<Short> addViolation(ArrayList<Short> lst, int violation) {
+    if (lst == null)
+      lst = new ArrayList<Short>();
+    lst.add((short) violation);
+    return lst;
+  }
+  
+  static private ArrayList<Short> addIfNotPresent(ArrayList<Short> lst, int intViolation) {
+    if (lst == null)
+      return addViolation(lst, intViolation);
+    short violation = (short) intViolation;
+    if (!lst.contains(violation))
+      return addViolation(lst, intViolation);
+    return lst;
+  }
+  
+  @Override
+  public List<Short> check(Environment env, Mutation mutation) {
+    
+    ArrayList<Short> violations = null;
+    
+    Collection<ColumnUpdate> colUpdates = mutation.getUpdates();
+    
+    // check the row, it should contains at least one ; or end with <
+    boolean containsSemiC = false;
+    
+    byte[] row = mutation.getRow();
+    
+    // always allow rows that fall within reserved areas
+    if (row.length > 0 && row[0] == '~')
+      return null;
+    if (row.length > 2 && row[0] == '!' && row[1] == '!' && row[2] == '~')
+      return null;
+    
+    for (byte b : row) {
+      if (b == ';') {
+        containsSemiC = true;
+      }
+      
+      if (b == ';' || b == '<')
+        break;
+      
+      if (!validTableNameChars[0xff & b]) {
+        violations = addIfNotPresent(violations, 4);
+      }
+    }
+    
+    if (!containsSemiC) {
+      // see if last row char is <
+      if (row.length == 0 || row[row.length - 1] != '<') {
+        violations = addIfNotPresent(violations, 4);
+      }
+    } else {
+      if (row.length == 0) {
+        violations = addIfNotPresent(violations, 4);
+      }
+    }
+    
+    if (row.length > 0 && row[0] == '!') {
+      if (row.length < 3 || row[1] != '0' || (row[2] != '<' && row[2] != ';')) {
+        violations = addIfNotPresent(violations, 4);
+      }
+    }
+    
+    // ensure row is not less than Constants.METADATA_TABLE_ID
+    if (new Text(row).compareTo(new Text(MetadataTable.ID)) < 0) {
+      violations = addViolation(violations, 5);
+    }
+    
+    boolean checkedBulk = false;
+    
+    for (ColumnUpdate columnUpdate : colUpdates) {
+      Text columnFamily = new Text(columnUpdate.getColumnFamily());
+      
+      if (columnUpdate.isDeleted()) {
+        if (!isValidColumn(columnUpdate)) {
+          violations = addViolation(violations, 2);
+        }
+        continue;
+      }
+      
+      if (columnUpdate.getValue().length == 0 && !columnFamily.equals(ScanFileColumnFamily.NAME)) {
+        violations = addViolation(violations, 6);
+      }
+      
+      if (columnFamily.equals(DataFileColumnFamily.NAME)) {
+        try {
+          DataFileValue dfv = new DataFileValue(columnUpdate.getValue());
+          
+          if (dfv.getSize() < 0 || dfv.getNumEntries() < 0) {
+            violations = addViolation(violations, 1);
+          }
+        } catch (NumberFormatException nfe) {
+          violations = addViolation(violations, 1);
+        } catch (ArrayIndexOutOfBoundsException aiooe) {
+          violations = addViolation(violations, 1);
+        }
+      } else if (columnFamily.equals(ScanFileColumnFamily.NAME)) {
+        
+      } else if (columnFamily.equals(TabletsSection.BulkFileColumnFamily.NAME)) {
+        if (!columnUpdate.isDeleted() && !checkedBulk) {
+          // splits, which also write the time reference, are allowed to write this reference even when
+          // the transaction is not running because the other half of the tablet is holding a reference
+          // to the file.
+          boolean isSplitMutation = false;
+          // When a tablet is assigned, it re-writes the metadata. It should probably only update the location information,
+          // but it writes everything. We allow it to re-write the bulk information if it is setting the location.
+          // See ACCUMULO-1230.
+          boolean isLocationMutation = false;
+          
+          HashSet<Text> dataFiles = new HashSet<Text>();
+          HashSet<Text> loadedFiles = new HashSet<Text>();
+          
+          String tidString = new String(columnUpdate.getValue());
+          int otherTidCount = 0;
+          
+          for (ColumnUpdate update : mutation.getUpdates()) {
+            if (new ColumnFQ(update).equals(TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN)) {
+              isSplitMutation = true;
+            } else if (new Text(update.getColumnFamily()).equals(TabletsSection.CurrentLocationColumnFamily.NAME)) {
+              isLocationMutation = true;
+            } else if (new Text(update.getColumnFamily()).equals(DataFileColumnFamily.NAME)) {
+              dataFiles.add(new Text(update.getColumnQualifier()));
+            } else if (new Text(update.getColumnFamily()).equals(TabletsSection.BulkFileColumnFamily.NAME)) {
+              loadedFiles.add(new Text(update.getColumnQualifier()));
+              
+              if (!new String(update.getValue()).equals(tidString)) {
+                otherTidCount++;
+              }
+            }
+          }
+          
+          if (!isSplitMutation && !isLocationMutation) {
+            long tid = Long.parseLong(tidString);
+            
+            try {
+              if (otherTidCount > 0 || !dataFiles.equals(loadedFiles) || !getArbitrator().transactionAlive(Constants.BULK_ARBITRATOR_TYPE, tid)) {
+                violations = addViolation(violations, 8);
+              }
+            } catch (Exception ex) {
+              violations = addViolation(violations, 8);
+            }
+          }
+          
+          checkedBulk = true;
+        }
+      } else {
+        if (!isValidColumn(columnUpdate)) {
+          violations = addViolation(violations, 2);
+        } else if (new ColumnFQ(columnUpdate).equals(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN) && columnUpdate.getValue().length > 0
+            && (violations == null || !violations.contains((short) 4))) {
+          KeyExtent ke = new KeyExtent(new Text(mutation.getRow()), (Text) null);
+          
+          Text per = KeyExtent.decodePrevEndRow(new Value(columnUpdate.getValue()));
+          
+          boolean prevEndRowLessThanEndRow = per == null || ke.getEndRow() == null || per.compareTo(ke.getEndRow()) < 0;
+          
+          if (!prevEndRowLessThanEndRow) {
+            violations = addViolation(violations, 3);
+          }
+        } else if (new ColumnFQ(columnUpdate).equals(TabletsSection.ServerColumnFamily.LOCK_COLUMN)) {
+          if (zooCache == null) {
+            zooCache = new ZooCache();
+          }
+          
+          if (zooRoot == null) {
+            zooRoot = ZooUtil.getRoot(HdfsZooInstance.getInstance());
+          }
+          
+          boolean lockHeld = false;
+          String lockId = new String(columnUpdate.getValue());
+          
+          try {
+            lockHeld = ZooLock.isLockHeld(zooCache, new ZooUtil.LockID(zooRoot, lockId));
+          } catch (Exception e) {
+            log.debug("Failed to verify lock was held " + lockId + " " + e.getMessage());
+          }
+          
+          if (!lockHeld) {
+            violations = addViolation(violations, 7);
+          }
+        }
+        
+      }
+    }
+    
+    if (violations != null) {
+      log.debug("violating metadata mutation : " + new String(mutation.getRow()));
+      for (ColumnUpdate update : mutation.getUpdates()) {
+        log.debug(" update: " + new String(update.getColumnFamily()) + ":" + new String(update.getColumnQualifier()) + " value "
+            + (update.isDeleted() ? "[delete]" : new String(update.getValue())));
+      }
+    }
+    
+    return violations;
+  }
+  
+  protected Arbitrator getArbitrator() {
+    return new ZooArbitrator();
+  }
+  
+  @Override
+  public String getViolationDescription(short violationCode) {
+    switch (violationCode) {
+      case 1:
+        return "data file size must be a non-negative integer";
+      case 2:
+        return "Invalid column name given.";
+      case 3:
+        return "Prev end row is greater than or equal to end row.";
+      case 4:
+        return "Invalid metadata row format";
+      case 5:
+        return "Row can not be less than " + MetadataTable.ID;
+      case 6:
+        return "Empty values are not allowed for any " + MetadataTable.NAME + " column";
+      case 7:
+        return "Lock not held in zookeeper by writer";
+      case 8:
+        return "Bulk load transaction no longer running";
+    }
+    return null;
+  }
+  
+  @Override
+  protected void finalize() {
+    if (zooCache != null)
+      zooCache.clear();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/data/ServerColumnUpdate.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/data/ServerColumnUpdate.java b/server/base/src/main/java/org/apache/accumulo/server/data/ServerColumnUpdate.java
new file mode 100644
index 0000000..af992a6
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/data/ServerColumnUpdate.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.data;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+
+public class ServerColumnUpdate extends ColumnUpdate {
+  
+  ServerMutation parent;
+
+  public ServerColumnUpdate(byte[] cf, byte[] cq, byte[] cv, boolean hasts, long ts, boolean deleted, byte[] val, ServerMutation serverMutation) {
+    super(cf, cq, cv, hasts, ts, deleted, val);
+    parent = serverMutation;
+  }
+
+  public long getTimestamp() {
+    if (hasTimestamp())
+      return super.getTimestamp();
+    return parent.getSystemTimestamp();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java b/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
new file mode 100644
index 0000000..389cc33
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.thrift.TMutation;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Mutation that holds system time as computed by the tablet server when not provided by the user.
+ */
+public class ServerMutation extends Mutation {
+  private long systemTime = 0l;
+  
+  public ServerMutation(TMutation tmutation) {
+    super(tmutation);
+  }
+
+  public ServerMutation(Text key) {
+    super(key);
+  }
+
+  public ServerMutation() {
+  }
+
+  protected void droppingOldTimestamp(long ts) {
+    this.systemTime = ts;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    // new format writes system time with the mutation
+    if (getSerializedFormat() == SERIALIZED_FORMAT.VERSION2)
+      systemTime = WritableUtils.readVLong(in);
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    WritableUtils.writeVLong(out, systemTime);
+  }
+
+  public void setSystemTimestamp(long v) {
+    this.systemTime = v;
+  }
+  
+  public long getSystemTimestamp() {
+    return this.systemTime;
+  }
+
+  @Override
+  protected ColumnUpdate newColumnUpdate(byte[] cf, byte[] cq, byte[] cv, boolean hasts, long ts, boolean deleted, byte[] val) {
+    return new ServerColumnUpdate(cf, cq, cv, hasts, ts, deleted, val, this);
+  }
+
+  @Override
+  public long estimatedMemoryUsed() {
+    return super.estimatedMemoryUsed() + 8;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == this) {
+      return true;
+    }
+    if (o == null || o.getClass() != ServerMutation.class) {
+      return false;
+    }
+    ServerMutation sm = (ServerMutation) o;
+    if (sm.systemTime != systemTime) {
+      return false;
+    }
+    return super.equals(o);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = super.hashCode();
+    result = 31 * result + (int) (systemTime & 0xffffffff);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java
new file mode 100644
index 0000000..b4bea4a
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * This is a glue object, to convert short file references to long references.
+ * The !METADATA table may contain old relative file references.  This class keeps 
+ * track of the short file reference, so it can be removed properly from the !METADATA table.
+ */
+public class FileRef implements Comparable<FileRef> {
+  String metaReference;  // something like ../2/d-00000/A00001.rf
+  Path fullReference;  // something like hdfs://nn:9001/accumulo/tables/2/d-00000/A00001.rf
+  
+  public FileRef(VolumeManager fs, Key key) {
+    metaReference = key.getColumnQualifier().toString();
+    fullReference = fs.getFullPath(key);
+  }
+  
+  public FileRef(String metaReference, Path fullReference) {
+    this.metaReference = metaReference;
+    this.fullReference = fullReference;
+  }
+  
+  public FileRef(String path) {
+    this.metaReference = path;
+    this.fullReference = new Path(path);
+  }
+  
+  public String toString() {
+    return fullReference.toString();
+  }
+  
+  public Path path() {
+    return fullReference;
+  }
+  
+  public Text meta() {
+    return new Text(metaReference);
+  }
+
+  @Override
+  public int compareTo(FileRef o) {
+    return path().compareTo(o.path());
+  }
+
+  @Override
+  public int hashCode() {
+    return path().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof FileRef) {
+      return compareTo((FileRef)obj) == 0;
+    }
+    return false;
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java
new file mode 100644
index 0000000..2760b07
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.util.Random;
+
+public class RandomVolumeChooser implements VolumeChooser {
+  Random random = new Random();
+  
+  @Override
+  public String choose(String[] options) {
+    return options[random.nextInt(options.length)];
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java
new file mode 100644
index 0000000..8713c97
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+
+public interface VolumeChooser {
+  String choose(String[] options);
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
new file mode 100644
index 0000000..b7787c9
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A wrapper around multiple hadoop FileSystem objects, which are assumed to be different volumes.
+ * This also concentrates a bunch of meta-operations like waiting for SAFE_MODE, and closing WALs.
+ */
+public interface VolumeManager {
+  
+  
+  
+  public static enum FileType {
+    TABLE(ServerConstants.TABLE_DIR), WAL(ServerConstants.WAL_DIR), RECOVERY(ServerConstants.RECOVERY_DIR);
+    
+    private String dir;
+    
+    FileType(String dir) {
+      this.dir = dir;
+    }
+    
+    public String getDirectory() {
+      return dir;
+    }
+  }
+  
+  // close the underlying FileSystems
+  void close() throws IOException;
+  
+  // the mechanism by which the master ensures that tablet servers can no longer write to a WAL
+  boolean closePossiblyOpenFile(Path path) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  FSDataOutputStream create(Path dest) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  FSDataOutputStream create(Path path, boolean b) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  FSDataOutputStream create(Path path, boolean b, int int1, short int2, long long1) throws IOException;
+  
+  // create a file, but only if it doesn't exist
+  boolean createNewFile(Path writable) throws IOException;
+  
+  // create a file which can be sync'd to disk
+  FSDataOutputStream createSyncable(Path logPath, int buffersize, short replication, long blockSize) throws IOException;
+  
+  // delete a file
+  boolean delete(Path path) throws IOException;
+  
+  // delete a directory and anything under it
+  boolean deleteRecursively(Path path) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  boolean exists(Path path) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  FileStatus getFileStatus(Path path) throws IOException;
+  
+  // find the appropriate FileSystem object given a path
+  FileSystem getFileSystemByPath(Path path);
+  
+  // get a mapping of volume to FileSystem
+  Map<String, ? extends FileSystem> getFileSystems();
+  
+  // return the item in options that is in the same volume as source
+  Path matchingFileSystem(Path source, String[] options);
+  
+  
+  // forward to the appropriate FileSystem object
+  FileStatus[] listStatus(Path path) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  boolean mkdirs(Path directory) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  FSDataInputStream open(Path path) throws IOException;
+  
+  // forward to the appropriate FileSystem object, throws an exception if the paths are in different volumes
+  boolean rename(Path path, Path newPath) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  boolean moveToTrash(Path sourcePath) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  short getDefaultReplication(Path logPath);
+  
+  // forward to the appropriate FileSystem object
+  boolean isFile(Path path) throws IOException;
+  
+  // all volume are ready to provide service (not in SafeMode, for example)
+  boolean isReady() throws IOException;
+  
+  // ambiguous references to files go here
+  FileSystem getDefaultVolume();
+  
+  // forward to the appropriate FileSystem object
+  FileStatus[] globStatus(Path path) throws IOException;
+
+  // Convert a file or directory !METADATA reference into a path
+  Path getFullPath(Key key);
+  
+  Path getFullPath(String tableId, String path);
+
+  // Given a filename, figure out the qualified path given multiple namespaces
+  Path getFullPath(FileType fileType, String fileName) throws IOException;
+
+  // forward to the appropriate FileSystem object
+  ContentSummary getContentSummary(Path dir) throws IOException;
+
+  // decide on which of the given locations to create a new file
+  String choose(String[] options);
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
new file mode 100644
index 0000000..39afe75
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+public class VolumeManagerImpl implements VolumeManager {
+
+  private static final Logger log = Logger.getLogger(VolumeManagerImpl.class);
+
+  Map<String,? extends FileSystem> volumes;
+  String defaultVolume;
+  AccumuloConfiguration conf;
+  VolumeChooser chooser;
+
+  protected VolumeManagerImpl(Map<String,? extends FileSystem> volumes, String defaultVolume, AccumuloConfiguration conf) {
+    this.volumes = volumes;
+    this.defaultVolume = defaultVolume;
+    this.conf = conf;
+    ensureSyncIsEnabled();
+    chooser = Property.createInstanceFromPropertyName(conf, Property.GENERAL_VOLUME_CHOOSER, VolumeChooser.class, new RandomVolumeChooser());
+  }
+
+  public static org.apache.accumulo.server.fs.VolumeManager getLocal() throws IOException {
+    return new VolumeManagerImpl(Collections.singletonMap("", FileSystem.getLocal(CachedConfiguration.getInstance())), "",
+        DefaultConfiguration.getDefaultConfiguration());
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOException ex = null;
+    for (FileSystem fs : volumes.values()) {
+      try {
+        fs.close();
+      } catch (IOException e) {
+        ex = e;
+      }
+    }
+    if (ex != null) {
+      throw ex;
+    }
+  }
+
+  @Override
+  public boolean closePossiblyOpenFile(Path path) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    if (fs instanceof DistributedFileSystem) {
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        return dfs.recoverLease(path);
+      } catch (FileNotFoundException ex) {
+        throw ex;
+      }
+    } else if (fs instanceof LocalFileSystem) {
+      // ignore
+    } else {
+      throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
+    }
+    fs.append(path).close();
+    log.info("Recovered lease on " + path.toString() + " using append");
+    return true;
+  }
+
+  @Override
+  public FSDataOutputStream create(Path path) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    return fs.create(path);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path path, boolean overwrite) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    return fs.create(path, overwrite);
+  }
+
+  private static long correctBlockSize(Configuration conf, long blockSize) {
+    if (blockSize <= 0)
+      blockSize = conf.getLong("dfs.block.size", 67108864);
+
+    int checkSum = conf.getInt("io.bytes.per.checksum", 512);
+    blockSize -= blockSize % checkSum;
+    blockSize = Math.max(blockSize, checkSum);
+    return blockSize;
+  }
+
+  private static int correctBufferSize(Configuration conf, int bufferSize) {
+    if (bufferSize <= 0)
+      bufferSize = conf.getInt("io.file.buffer.size", 4096);
+    return bufferSize;
+  }
+
+  @Override
+  public FSDataOutputStream create(Path path, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    if (bufferSize == 0) {
+      fs.getConf().getInt("io.file.buffer.size", 4096);
+    }
+    return fs.create(path, overwrite, bufferSize, replication, correctBlockSize(fs.getConf(), blockSize));
+  }
+
+  @Override
+  public boolean createNewFile(Path path) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    return fs.createNewFile(path);
+  }
+
+  @Override
+  public FSDataOutputStream createSyncable(Path logPath, int bufferSize, short replication, long blockSize) throws IOException {
+    FileSystem fs = getFileSystemByPath(logPath);
+    blockSize = correctBlockSize(fs.getConf(), blockSize);
+    bufferSize = correctBufferSize(fs.getConf(), bufferSize);
+    try {
+      // This...
+      // EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
+      // return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
+      // Becomes this:
+      Class<?> createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag");
+      List<Enum<?>> flags = new ArrayList<Enum<?>>();
+      if (createFlags.isEnum()) {
+        for (Object constant : createFlags.getEnumConstants()) {
+          if (constant.toString().equals("SYNC_BLOCK")) {
+            flags.add((Enum<?>) constant);
+            log.debug("Found synch enum " + constant);
+          }
+          if (constant.toString().equals("CREATE")) {
+            flags.add((Enum<?>) constant);
+            log.debug("Found CREATE enum " + constant);
+          }
+        }
+      }
+      Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null, flags.get(0), flags.get(1));
+      log.debug("CreateFlag set: " + set);
+      Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class, EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class);
+      log.debug("creating " + logPath + " with SYNCH_BLOCK flag");
+      return (FSDataOutputStream) create.invoke(fs, logPath, FsPermission.getDefault(), set, bufferSize, replication, blockSize, null);
+    } catch (ClassNotFoundException ex) {
+      // Expected in hadoop 1.0
+      return fs.create(logPath, true, bufferSize, replication, blockSize);
+    } catch (Exception ex) {
+      log.debug(ex, ex);
+      return fs.create(logPath, true, bufferSize, replication, blockSize);
+    }
+  }
+
+  @Override
+  public boolean delete(Path path) throws IOException {
+    return getFileSystemByPath(path).delete(path, false);
+  }
+
+  @Override
+  public boolean deleteRecursively(Path path) throws IOException {
+    return getFileSystemByPath(path).delete(path, true);
+  }
+
+  protected void ensureSyncIsEnabled() {
+    for (Entry<String,? extends FileSystem> entry : getFileSystems().entrySet()) {
+      final String volumeName = entry.getKey();
+      final FileSystem fs = entry.getValue();
+      
+      if (fs instanceof DistributedFileSystem) {
+        final String DFS_DURABLE_SYNC = "dfs.durable.sync", DFS_SUPPORT_APPEND = "dfs.support.append";
+        final String ticketMessage = "See ACCUMULO-623 and ACCUMULO-1637 for more details.";
+        // Check to make sure that we have proper defaults configured
+        try {
+          // If the default is off (0.20.205.x or 1.0.x)
+          DFSConfigKeys configKeys = new DFSConfigKeys();
+          
+          // Can't use the final constant itself as Java will inline it at compile time
+          Field dfsSupportAppendDefaultField = configKeys.getClass().getField("DFS_SUPPORT_APPEND_DEFAULT");
+          boolean dfsSupportAppendDefaultValue = dfsSupportAppendDefaultField.getBoolean(configKeys);
+          
+          if (!dfsSupportAppendDefaultValue) {
+            // See if the user did the correct override
+            if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, false)) {
+              String msg = "Accumulo requires that dfs.support.append to true. " + ticketMessage;
+              log.fatal(msg);
+              throw new RuntimeException(msg);
+            }
+          }
+        } catch (NoSuchFieldException e) {
+          // If we can't find DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT, the user is running
+          // 1.1.x or 1.2.x. This is ok, though, as, by default, these versions have append/sync enabled.
+        } catch (Exception e) {
+          log.warn("Error while checking for " + DFS_SUPPORT_APPEND + " on volume " + volumeName + ". The user should ensure that Hadoop is configured to properly supports append and sync. " + ticketMessage, e);
+        }
+        
+        // If either of these parameters are configured to be false, fail.
+        // This is a sign that someone is writing bad configuration.
+        if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, true) || !fs.getConf().getBoolean(DFS_DURABLE_SYNC, true)) {
+          String msg = "Accumulo requires that " + DFS_SUPPORT_APPEND + " and " + DFS_DURABLE_SYNC + " not be configured as false. " + ticketMessage;
+          log.fatal(msg);
+          throw new RuntimeException(msg);
+        }
+        
+        try {
+          // if this class exists
+          Class.forName("org.apache.hadoop.fs.CreateFlag");
+          // we're running hadoop 2.0, 1.1
+          if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) {
+            log.warn("dfs.datanode.synconclose set to false: data loss is possible on system reset or power loss on volume " + volumeName);
+          }
+        } catch (ClassNotFoundException ex) {
+          // hadoop 1.0
+        }
+      }
+    }
+
+  }
+
+  @Override
+  public boolean exists(Path path) throws IOException {
+    return getFileSystemByPath(path).exists(path);
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path path) throws IOException {
+    return getFileSystemByPath(path).getFileStatus(path);
+  }
+
+  @Override
+  public FileSystem getFileSystemByPath(Path path) {
+    if (path.toString().contains(":")) {
+      try {
+        return path.getFileSystem(CachedConfiguration.getInstance());
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+    return volumes.get(defaultVolume);
+  }
+
+  @Override
+  public Map<String,? extends FileSystem> getFileSystems() {
+    return volumes;
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path path) throws IOException {
+    return getFileSystemByPath(path).listStatus(path);
+  }
+
+  @Override
+  public boolean mkdirs(Path path) throws IOException {
+    return getFileSystemByPath(path).mkdirs(path);
+  }
+
+  @Override
+  public FSDataInputStream open(Path path) throws IOException {
+    return getFileSystemByPath(path).open(path);
+  }
+
+  @Override
+  public boolean rename(Path path, Path newPath) throws IOException {
+    FileSystem source = getFileSystemByPath(path);
+    FileSystem dest = getFileSystemByPath(newPath);
+    if (source != dest) {
+      throw new NotImplementedException("Cannot rename files across volumes: " + path + " -> " + newPath);
+    }
+    return source.rename(path, newPath);
+  }
+
+  @Override
+  public boolean moveToTrash(Path path) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    Trash trash = new Trash(fs, fs.getConf());
+    return trash.moveToTrash(path);
+  }
+
+  @Override
+  public short getDefaultReplication(Path path) {
+    @SuppressWarnings("deprecation")
+    short rep = getFileSystemByPath(path).getDefaultReplication();
+    return rep;
+  }
+
+  @Override
+  public boolean isFile(Path path) throws IOException {
+    return getFileSystemByPath(path).isFile(path);
+  }
+
+  public static VolumeManager get() throws IOException {
+    AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
+    return get(conf);
+  }
+
+  static private final String DEFAULT = "";
+
+  public static VolumeManager get(AccumuloConfiguration conf) throws IOException {
+    Map<String,FileSystem> fileSystems = new HashMap<String,FileSystem>();
+    Configuration hadoopConf = CachedConfiguration.getInstance();
+    fileSystems.put(DEFAULT, FileUtil.getFileSystem(hadoopConf, conf));
+    String ns = conf.get(Property.INSTANCE_VOLUMES);
+    if (ns != null && !ns.isEmpty()) {
+      for (String space : ns.split(",")) {
+        if (space.equals(DEFAULT))
+          throw new IllegalArgumentException();
+
+        if (space.contains(":")) {
+          fileSystems.put(space, new Path(space).getFileSystem(hadoopConf));
+        } else {
+          fileSystems.put(space, FileSystem.get(hadoopConf));
+        }
+      }
+    }
+    return new VolumeManagerImpl(fileSystems, DEFAULT, conf);
+  }
+
+  @Override
+  public boolean isReady() throws IOException {
+    for (FileSystem fs : getFileSystems().values()) {
+      if (!(fs instanceof DistributedFileSystem))
+        continue;
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
+      // Becomes this:
+      Class<?> safeModeAction;
+      try {
+        // hadoop 2.0
+        safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
+      } catch (ClassNotFoundException ex) {
+        // hadoop 1.0
+        try {
+          safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
+        } catch (ClassNotFoundException e) {
+          throw new RuntimeException("Cannot figure out the right class for Constants");
+        }
+      }
+      Object get = null;
+      for (Object obj : safeModeAction.getEnumConstants()) {
+        if (obj.toString().equals("SAFEMODE_GET"))
+          get = obj;
+      }
+      if (get == null) {
+        throw new RuntimeException("cannot find SAFEMODE_GET");
+      }
+      try {
+        Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
+        boolean inSafeMode = (Boolean) setSafeMode.invoke(dfs, get);
+        if (inSafeMode) {
+          return false;
+        }
+      } catch (Exception ex) {
+        throw new RuntimeException("cannot find method setSafeMode");
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public FileSystem getDefaultVolume() {
+    return volumes.get(defaultVolume);
+  }
+
+  @Override
+  public FileStatus[] globStatus(Path pathPattern) throws IOException {
+    return getFileSystemByPath(pathPattern).globStatus(pathPattern);
+  }
+
+  @Override
+  public Path getFullPath(Key key) {
+    // TODO sanity check col fam
+    String relPath = key.getColumnQualifierData().toString();
+    byte[] tableId = KeyExtent.tableOfMetadataRow(key.getRow());
+    return getFullPath(new String(tableId), relPath);
+  }
+
+  @Override
+  public Path matchingFileSystem(Path source, String[] options) {
+    URI uri1 = source.toUri();
+    for (String option : options) {
+      URI uri3 = URI.create(option);
+      if (uri1.getScheme().equals(uri3.getScheme())) {
+        String a1 = uri1.getAuthority();
+        String a2 = uri3.getAuthority();
+        if (a1 == a2 || (a1 != null && a1.equals(a2)))
+          return new Path(option);
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Path getFullPath(String tableId, String path) {
+    if (path.contains(":"))
+      return new Path(path);
+    
+    if (path.startsWith("../"))
+      path = path.substring(2);
+    else if (path.startsWith("/"))
+      path = "/" + tableId + path;
+    else
+      throw new IllegalArgumentException("Unexpected path prefix " + path);
+    
+    return getFullPath(FileType.TABLE, path);
+  }
+  
+  @Override
+  public Path getFullPath(FileType fileType, String path) {
+    if (path.contains(":"))
+      return new Path(path);
+    
+    // normalize the path
+    Path fullPath = new Path(ServerConstants.getDefaultBaseDir(), fileType.getDirectory());
+    if (path.startsWith("/"))
+      path = path.substring(1);
+    fullPath = new Path(fullPath, path);
+    
+    FileSystem fs = getFileSystemByPath(fullPath);
+    return fs.makeQualified(fullPath);
+  }
+
+  @Override
+  public ContentSummary getContentSummary(Path dir) throws IOException {
+    return getFileSystemByPath(dir).getContentSummary(dir);
+  }
+
+  @Override
+  public String choose(String[] options) {
+    return chooser.choose(options);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
new file mode 100644
index 0000000..0477a44
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.init;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import jline.console.ConsoleReader;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.iterators.user.VersioningIterator;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.master.thrift.MasterGoalState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.security.SecurityUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.constraints.MetadataConstraints;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
+import org.apache.accumulo.server.security.AuditedSecurityOperation;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.tables.TableManager;
+import org.apache.accumulo.server.tablets.TabletTime;
+import org.apache.accumulo.server.util.TablePropUtil;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * This class is used to setup the directory structure and the root tablet to get an instance started
+ * 
+ */
+public class Initialize {
+  private static final Logger log = Logger.getLogger(Initialize.class);
+  private static final String DEFAULT_ROOT_USER = "root";
+  public static final String TABLE_TABLETS_TABLET_DIR = "/table_info";
+  
+  private static ConsoleReader reader = null;
+  
+  private static ConsoleReader getConsoleReader() throws IOException {
+    if (reader == null)
+      reader = new ConsoleReader();
+    return reader;
+  }
+  
+  private static HashMap<String,String> initialMetadataConf = new HashMap<String,String>();
+  static {
+    initialMetadataConf.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "32K");
+    initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), "5");
+    initialMetadataConf.put(Property.TABLE_WALOG_ENABLED.getKey(), "true");
+    initialMetadataConf.put(Property.TABLE_MAJC_RATIO.getKey(), "1");
+    initialMetadataConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "64M");
+    initialMetadataConf.put(Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1", MetadataConstraints.class.getName());
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers", "10," + VersioningIterator.class.getName());
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers.opt.maxVersions", "1");
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers", "10," + VersioningIterator.class.getName());
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers.opt.maxVersions", "1");
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers", "10," + VersioningIterator.class.getName());
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers.opt.maxVersions", "1");
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.bulkLoadFilter", "20," + MetadataBulkLoadFilter.class.getName());
+    initialMetadataConf.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false");
+    initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "tablet",
+        String.format("%s,%s", TabletsSection.TabletColumnFamily.NAME, TabletsSection.CurrentLocationColumnFamily.NAME));
+    initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "server", String.format("%s,%s,%s,%s", DataFileColumnFamily.NAME,
+        LogColumnFamily.NAME, TabletsSection.ServerColumnFamily.NAME, TabletsSection.FutureLocationColumnFamily.NAME));
+    initialMetadataConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "tablet,server");
+    initialMetadataConf.put(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey(), "");
+    initialMetadataConf.put(Property.TABLE_INDEXCACHE_ENABLED.getKey(), "true");
+    initialMetadataConf.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true");
+  }
+  
+  public static boolean doInit(Opts opts, Configuration conf, VolumeManager fs) throws IOException {
+    if (!ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_URI).equals(""))
+      log.info("Hadoop Filesystem is " + ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_URI));
+    else
+      log.info("Hadoop Filesystem is " + FileSystem.getDefaultUri(conf));
+    
+    log.info("Accumulo data dirs are " + Arrays.asList(ServerConstants.getBaseDirs()));
+    log.info("Zookeeper server is " + ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST));
+    log.info("Checking if Zookeeper is available. If this hangs, then you need to make sure zookeeper is running");
+    if (!zookeeperAvailable()) {
+      log.fatal("Zookeeper needs to be up and running in order to init. Exiting ...");
+      return false;
+    }
+    if (ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_SECRET).equals(Property.INSTANCE_SECRET.getDefaultValue())) {
+      ConsoleReader c = getConsoleReader();
+      c.beep();
+      c.println();
+      c.println();
+      c.println("Warning!!! Your instance secret is still set to the default, this is not secure. We highly recommend you change it.");
+      c.println();
+      c.println();
+      c.println("You can change the instance secret in accumulo by using:");
+      c.println("   bin/accumulo " + org.apache.accumulo.server.util.ChangeSecret.class.getName() + " oldPassword newPassword.");
+      c.println("You will also need to edit your secret in your configuration file by adding the property instance.secret to your conf/accumulo-site.xml. Without this accumulo will not operate correctly");
+    }
+    
+    try {
+      if (isInitialized(fs)) {
+        log.fatal("It appears this location was previously initialized, exiting ... ");
+        return false;
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    
+    // prompt user for instance name and root password early, in case they
+    // abort, we don't leave an inconsistent HDFS/ZooKeeper structure
+    String instanceNamePath;
+    try {
+      instanceNamePath = getInstanceNamePath(opts);
+    } catch (Exception e) {
+      log.fatal("Failed to talk to zookeeper", e);
+      return false;
+    }
+    opts.rootpass = getRootPassword(opts);
+    return initialize(opts, instanceNamePath, fs);
+  }
+  
+  public static boolean initialize(Opts opts, String instanceNamePath, VolumeManager fs) {
+    
+    UUID uuid = UUID.randomUUID();
+    // the actual disk locations of the root table and tablets
+    final Path rootTablet = new Path(fs.choose(ServerConstants.getTablesDirs()) + "/" + RootTable.ID + RootTable.ROOT_TABLET_LOCATION);
+    try {
+      initZooKeeper(opts, uuid.toString(), instanceNamePath, rootTablet);
+    } catch (Exception e) {
+      log.fatal("Failed to initialize zookeeper", e);
+      return false;
+    }
+    
+    try {
+      initFileSystem(opts, fs, uuid, rootTablet);
+    } catch (Exception e) {
+      log.fatal("Failed to initialize filesystem", e);
+      return false;
+    }
+    
+    try {
+      initSecurity(opts, uuid.toString());
+    } catch (Exception e) {
+      log.fatal("Failed to initialize security", e);
+      return false;
+    }
+    return true;
+  }
+  
+  private static boolean zookeeperAvailable() {
+    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+    try {
+      return zoo.exists("/");
+    } catch (KeeperException e) {
+      return false;
+    } catch (InterruptedException e) {
+      return false;
+    }
+  }
+  
+  private static Path[] paths(String[] paths) {
+    Path[] result = new Path[paths.length];
+    for (int i = 0; i < paths.length; i++) {
+      result[i] = new Path(paths[i]);
+    }
+    return result;
+  }
+  
+  //TODO Remove deprecation warning suppression when Hadoop1 support is dropped
+  @SuppressWarnings("deprecation")
+  private static void initFileSystem(Opts opts, VolumeManager fs, UUID uuid, Path rootTablet) throws IOException {
+    FileStatus fstat;
+
+    // the actual disk locations of the metadata table and tablets
+    final Path[] metadataTableDirs = paths(ServerConstants.getMetadataTableDirs());
+    
+    String tableMetadataTabletDir = fs.choose(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), TABLE_TABLETS_TABLET_DIR));
+    String defaultMetadataTabletDir = fs.choose(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), Constants.DEFAULT_TABLET_LOCATION));
+
+    fs.mkdirs(new Path(ServerConstants.getDataVersionLocation(), "" + ServerConstants.DATA_VERSION));
+    
+    // create an instance id
+    fs.mkdirs(ServerConstants.getInstanceIdLocation());
+    fs.createNewFile(new Path(ServerConstants.getInstanceIdLocation(), uuid.toString()));
+    
+    // initialize initial metadata config in zookeeper
+    initMetadataConfig();
+    
+    // create metadata table
+    for (Path mtd : metadataTableDirs) {
+      try {
+        fstat = fs.getFileStatus(mtd);
+        if (!fstat.isDir()) {
+          log.fatal("location " + mtd.toString() + " exists but is not a directory");
+          return;
+        }
+      } catch (FileNotFoundException fnfe) {
+        if (!fs.mkdirs(mtd)) {
+          log.fatal("unable to create directory " + mtd.toString());
+          return;
+        }
+      }
+    }
+    
+    // create root table and tablet
+    try {
+      fstat = fs.getFileStatus(rootTablet);
+      if (!fstat.isDir()) {
+        log.fatal("location " + rootTablet.toString() + " exists but is not a directory");
+        return;
+      }
+    } catch (FileNotFoundException fnfe) {
+      if (!fs.mkdirs(rootTablet)) {
+        log.fatal("unable to create directory " + rootTablet.toString());
+        return;
+      }
+    }
+    
+    // populate the root tablet with info about the default tablet
+    // the root tablet contains the key extent and locations of all the
+    // metadata tablets
+    String initRootTabFile = rootTablet + "/00000_00000." + FileOperations.getNewFileExtension(AccumuloConfiguration.getDefaultConfiguration());
+    FileSystem ns = fs.getFileSystemByPath(new Path(initRootTabFile));
+    FileSKVWriter mfw = FileOperations.getInstance().openWriter(initRootTabFile, ns, ns.getConf(), AccumuloConfiguration.getDefaultConfiguration());
+    mfw.startDefaultLocalityGroup();
+    
+    Text tableExtent = new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), MetadataSchema.TabletsSection.getRange().getEndKey().getRow()));
+    
+    // table tablet's directory
+    Key tableDirKey = new Key(tableExtent, TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnFamily(),
+        TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnQualifier(), 0);
+    mfw.append(tableDirKey, new Value(tableMetadataTabletDir.getBytes()));
+    
+    // table tablet time
+    Key tableTimeKey = new Key(tableExtent, TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnFamily(),
+        TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnQualifier(), 0);
+    mfw.append(tableTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
+    
+    // table tablet's prevrow
+    Key tablePrevRowKey = new Key(tableExtent, TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnFamily(),
+        TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnQualifier(), 0);
+    mfw.append(tablePrevRowKey, KeyExtent.encodePrevEndRow(null));
+    
+    // ----------] default tablet info
+    Text defaultExtent = new Text(KeyExtent.getMetadataEntry(new Text(MetadataTable.ID), null));
+    
+    // default's directory
+    Key defaultDirKey = new Key(defaultExtent, TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnFamily(),
+        TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnQualifier(), 0);
+    mfw.append(defaultDirKey, new Value(defaultMetadataTabletDir.getBytes()));
+    
+    // default's time
+    Key defaultTimeKey = new Key(defaultExtent, TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnFamily(),
+        TabletsSection.ServerColumnFamily.TIME_COLUMN.getColumnQualifier(), 0);
+    mfw.append(defaultTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
+    
+    // default's prevrow
+    Key defaultPrevRowKey = new Key(defaultExtent, TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnFamily(),
+        TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.getColumnQualifier(), 0);
+    mfw.append(defaultPrevRowKey, KeyExtent.encodePrevEndRow(MetadataSchema.TabletsSection.getRange().getEndKey().getRow()));
+    
+    mfw.close();
+    
+    // create table and default tablets directories
+    for (String s : Arrays.asList(tableMetadataTabletDir, defaultMetadataTabletDir)) {
+      Path dir = new Path(s);
+      try {
+        fstat = fs.getFileStatus(dir);
+        if (!fstat.isDir()) {
+          log.fatal("location " + dir.toString() + " exists but is not a directory");
+          return;
+        }
+      } catch (FileNotFoundException fnfe) {
+        try {
+          fstat = fs.getFileStatus(dir);
+          if (!fstat.isDir()) {
+            log.fatal("location " + dir.toString() + " exists but is not a directory");
+            return;
+          }
+        } catch (FileNotFoundException fnfe2) {
+          // create table info dir
+          if (!fs.mkdirs(dir)) {
+            log.fatal("unable to create directory " + dir.toString());
+            return;
+          }
+        }
+        
+        // create default dir
+        if (!fs.mkdirs(dir)) {
+          log.fatal("unable to create directory " + dir.toString());
+          return;
+        }
+      }
+    }
+  }
+  
+  private static void initZooKeeper(Opts opts, String uuid, String instanceNamePath, Path rootTablet) throws KeeperException, InterruptedException {
+    // setup basic data in zookeeper
+    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
+    ZooUtil.putPersistentData(zoo.getZooKeeper(), Constants.ZROOT, new byte[0], -1, NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE);
+    ZooUtil.putPersistentData(zoo.getZooKeeper(), Constants.ZROOT + Constants.ZINSTANCES, new byte[0], -1, NodeExistsPolicy.SKIP, Ids.OPEN_ACL_UNSAFE);
+    
+    // setup instance name
+    if (opts.clearInstanceName)
+      zoo.recursiveDelete(instanceNamePath, NodeMissingPolicy.SKIP);
+    zoo.putPersistentData(instanceNamePath, uuid.getBytes(), NodeExistsPolicy.FAIL);
+    
+    // setup the instance
+    String zkInstanceRoot = Constants.ZROOT + "/" + uuid;
+    zoo.putPersistentData(zkInstanceRoot, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLES, Constants.ZTABLES_INITIAL_ID, NodeExistsPolicy.FAIL);
+    TableManager.prepareNewTableState(uuid, RootTable.ID, RootTable.NAME, TableState.ONLINE, NodeExistsPolicy.FAIL);
+    TableManager.prepareNewTableState(uuid, MetadataTable.ID, MetadataTable.NAME, TableState.ONLINE, NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZTSERVERS, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_WALOGS, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_PATH, rootTablet.toString().getBytes(), NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZTRACERS, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_GOAL_STATE, MasterGoalState.NORMAL.toString().getBytes(), NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZGC, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZGC_LOCK, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZCONFIG, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLE_LOCKS, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZHDFS_RESERVATIONS, new byte[0], NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZNEXT_FILE, new byte[] {'0'}, NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZRECOVERY, new byte[] {'0'}, NodeExistsPolicy.FAIL);
+  }
+  
+  private static String getInstanceNamePath(Opts opts) throws IOException, KeeperException, InterruptedException {
+    // setup the instance name
+    String instanceName, instanceNamePath = null;
+    boolean exists = true;
+    do {
+      if (opts.cliInstanceName == null) {
+        instanceName = getConsoleReader().readLine("Instance name : ");
+      } else {
+        instanceName = opts.cliInstanceName;
+      }
+      if (instanceName == null)
+        System.exit(0);
+      instanceName = instanceName.trim();
+      if (instanceName.length() == 0)
+        continue;
+      instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + instanceName;
+      if (opts.clearInstanceName) {
+        exists = false;
+        break;
+      } else if (exists = ZooReaderWriter.getInstance().exists(instanceNamePath)) {
+        String decision = getConsoleReader().readLine("Instance name \"" + instanceName + "\" exists. Delete existing entry from zookeeper? [Y/N] : ");
+        if (decision == null)
+          System.exit(0);
+        if (decision.length() == 1 && decision.toLowerCase(Locale.ENGLISH).charAt(0) == 'y') {
+          opts.clearInstanceName = true;
+          exists = false;
+        }
+      }
+    } while (exists);
+    return instanceNamePath;
+  }
+  
+  private static byte[] getRootPassword(Opts opts) throws IOException {
+    if (opts.cliPassword != null) {
+      return opts.cliPassword.getBytes();
+    }
+    String rootpass;
+    String confirmpass;
+    do {
+      rootpass = getConsoleReader()
+          .readLine("Enter initial password for " + DEFAULT_ROOT_USER + " (this may not be applicable for your security setup): ", '*');
+      if (rootpass == null)
+        System.exit(0);
+      confirmpass = getConsoleReader().readLine("Confirm initial password for " + DEFAULT_ROOT_USER + ": ", '*');
+      if (confirmpass == null)
+        System.exit(0);
+      if (!rootpass.equals(confirmpass))
+        log.error("Passwords do not match");
+    } while (!rootpass.equals(confirmpass));
+    return rootpass.getBytes();
+  }
+  
+  private static void initSecurity(Opts opts, String iid) throws AccumuloSecurityException, ThriftSecurityException {
+    AuditedSecurityOperation.getInstance(iid, true).initializeSecurity(SystemCredentials.get().toThrift(HdfsZooInstance.getInstance()), DEFAULT_ROOT_USER,
+        opts.rootpass);
+  }
+  
+  public static void initMetadataConfig(String tableId) throws IOException {
+    try {
+      Configuration conf = CachedConfiguration.getInstance();
+      int max = conf.getInt("dfs.replication.max", 512);
+      // Hadoop 0.23 switched the min value configuration name
+      int min = Math.max(conf.getInt("dfs.replication.min", 1), conf.getInt("dfs.namenode.replication.min", 1));
+      if (max < 5)
+        setMetadataReplication(max, "max");
+      if (min > 5)
+        setMetadataReplication(min, "min");
+      for (Entry<String,String> entry : initialMetadataConf.entrySet()) {
+        if (!TablePropUtil.setTableProperty(RootTable.ID, entry.getKey(), entry.getValue()))
+          throw new IOException("Cannot create per-table property " + entry.getKey());
+        if (!TablePropUtil.setTableProperty(MetadataTable.ID, entry.getKey(), entry.getValue()))
+          throw new IOException("Cannot create per-table property " + entry.getKey());
+      }
+    } catch (Exception e) {
+      log.fatal("error talking to zookeeper", e);
+      throw new IOException(e);
+    }
+  }
+  
+  protected static void initMetadataConfig() throws IOException {
+    initMetadataConfig(RootTable.ID);
+    initMetadataConfig(MetadataTable.ID);
+  }
+  
+  private static void setMetadataReplication(int replication, String reason) throws IOException {
+    String rep = getConsoleReader().readLine(
+        "Your HDFS replication " + reason + " is not compatible with our default " + MetadataTable.NAME + " replication of 5. What do you want to set your "
+            + MetadataTable.NAME + " replication to? (" + replication + ") ");
+    if (rep == null || rep.length() == 0)
+      rep = Integer.toString(replication);
+    else
+      // Lets make sure it's a number
+      Integer.parseInt(rep);
+    initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), rep);
+  }
+  
+  public static boolean isInitialized(VolumeManager fs) throws IOException {
+    return (fs.exists(ServerConstants.getInstanceIdLocation()) || fs.exists(ServerConstants.getDataVersionLocation()));
+  }
+  
+  static class Opts extends Help {
+    @Parameter(names = "--reset-security", description = "just update the security information")
+    boolean resetSecurity = false;
+    @Parameter(names = "--clear-instance-name", description = "delete any existing instance name without prompting")
+    boolean clearInstanceName = false;
+    @Parameter(names = "--instance-name", description = "the instance name, if not provided, will prompt")
+    String cliInstanceName;
+    @Parameter(names = "--password", description = "set the password on the command line")
+    String cliPassword;
+    
+    byte[] rootpass = null;
+  }
+  
+  public static void main(String[] args) {
+    Opts opts = new Opts();
+    opts.parseArgs(Initialize.class.getName(), args);
+    
+    try {
+      SecurityUtil.serverLogin();
+      Configuration conf = CachedConfiguration.getInstance();
+      
+      @SuppressWarnings("deprecation")
+      VolumeManager fs = VolumeManagerImpl.get(SiteConfiguration.getSiteConfiguration());
+      
+      if (opts.resetSecurity) {
+        if (isInitialized(fs)) {
+          opts.rootpass = getRootPassword(opts);
+          initSecurity(opts, HdfsZooInstance.getInstance().getInstanceID());
+        } else {
+          log.fatal("Attempted to reset security on accumulo before it was initialized");
+        }
+      } else if (!doInit(opts, conf, fs))
+        System.exit(-1);
+    } catch (Exception e) {
+      log.fatal(e, e);
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java b/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
new file mode 100644
index 0000000..d8bcebe
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.iterators;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.log4j.Logger;
+
+/**
+ * A special iterator for the metadata table that removes inactive bulk load flags
+ * 
+ */
+public class MetadataBulkLoadFilter extends Filter {
+  private static Logger log = Logger.getLogger(MetadataBulkLoadFilter.class);
+  
+  enum Status {
+    ACTIVE, INACTIVE
+  }
+  
+  Map<Long,Status> bulkTxStatusCache;
+  Arbitrator arbitrator;
+  
+  @Override
+  public boolean accept(Key k, Value v) {
+    if (!k.isDeleted() && k.compareColumnFamily(TabletsSection.BulkFileColumnFamily.NAME) == 0) {
+      long txid = Long.valueOf(v.toString());
+      
+      Status status = bulkTxStatusCache.get(txid);
+      if (status == null) {
+        try {
+          if (arbitrator.transactionComplete(Constants.BULK_ARBITRATOR_TYPE, txid)) {
+            status = Status.INACTIVE;
+          } else {
+            status = Status.ACTIVE;
+          }
+        } catch (Exception e) {
+          status = Status.ACTIVE;
+          log.error(e, e);
+        }
+        
+        bulkTxStatusCache.put(txid, status);
+      }
+      
+      return status == Status.ACTIVE;
+    }
+    
+    return true;
+  }
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    
+    if (env.getIteratorScope() == IteratorScope.scan) {
+      throw new IOException("This iterator not intended for use at scan time");
+    }
+    
+    bulkTxStatusCache = new HashMap<Long,MetadataBulkLoadFilter.Status>();
+    arbitrator = getArbitrator();
+  }
+  
+  protected Arbitrator getArbitrator() {
+    return new ZooArbitrator();
+  }
+}