You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/06/04 20:53:17 UTC

[36/43] accumulo git commit: ACCUMULO-3871 move ITs into distro jar, stop building test jar

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/VolumeChooserIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeChooserIT.java b/test/src/main/java/org/apache/accumulo/test/VolumeChooserIT.java
new file mode 100644
index 0000000..c2dee9f
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/VolumeChooserIT.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.fs.PerTableVolumeChooser;
+import org.apache.accumulo.server.fs.PreferredVolumeChooser;
+import org.apache.accumulo.server.fs.RandomVolumeChooser;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class VolumeChooserIT extends ConfigurableMacBase {
+
+  private static final Text EMPTY = new Text();
+  private static final Value EMPTY_VALUE = new Value(new byte[] {});
+  private File volDirBase;
+  private Path v1, v2, v3, v4;
+  private String[] rows = "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(",");
+  private String namespace1;
+  private String namespace2;
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 30;
+  };
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    // Get 2 tablet servers
+    cfg.setNumTservers(2);
+    namespace1 = "ns_" + getUniqueNames(2)[0];
+    namespace2 = "ns_" + getUniqueNames(2)[1];
+
+    // Set the general volume chooser to the PerTableVolumeChooser so that different choosers can be specified
+    Map<String,String> siteConfig = new HashMap<String,String>();
+    siteConfig.put(Property.GENERAL_VOLUME_CHOOSER.getKey(), PerTableVolumeChooser.class.getName());
+    cfg.setSiteConfig(siteConfig);
+
+    // Set up 4 different volume paths
+    File baseDir = cfg.getDir();
+    volDirBase = new File(baseDir, "volumes");
+    File v1f = new File(volDirBase, "v1");
+    File v2f = new File(volDirBase, "v2");
+    File v3f = new File(volDirBase, "v3");
+    File v4f = new File(volDirBase, "v4");
+    v1 = new Path("file://" + v1f.getAbsolutePath());
+    v2 = new Path("file://" + v2f.getAbsolutePath());
+    v3 = new Path("file://" + v3f.getAbsolutePath());
+    v4 = new Path("file://" + v4f.getAbsolutePath());
+
+    // Only add volumes 1, 2, and 4 to the list of instance volumes to have one volume that isn't in the options list when they are choosing
+    cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString() + "," + v4.toString());
+
+    // use raw local file system so walogs sync and flush will work
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+
+    super.configure(cfg, hadoopCoreSite);
+
+  }
+
+  public void addSplits(Connector connector, String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    // Add 10 splits to the table
+    SortedSet<Text> partitions = new TreeSet<Text>();
+    for (String s : "b,e,g,j,l,o,q,t,v,y".split(","))
+      partitions.add(new Text(s));
+    connector.tableOperations().addSplits(tableName, partitions);
+  }
+
+  public void writeAndReadData(Connector connector, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    // Write some data to the table
+    BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
+    for (String s : rows) {
+      Mutation m = new Mutation(new Text(s));
+      m.put(EMPTY, EMPTY, EMPTY_VALUE);
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    // Write the data to disk, read it back
+    connector.tableOperations().flush(tableName, null, null, true);
+    Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+    int i = 0;
+    for (Entry<Key,Value> entry : scanner) {
+      assertEquals("Data read is not data written", rows[i++], entry.getKey().getRow().toString());
+    }
+  }
+
+  public void verifyVolumes(Connector connector, String tableName, Range tableRange, String vol) throws TableNotFoundException {
+    // Verify the new files are written to the Volumes specified
+    ArrayList<String> volumes = new ArrayList<String>();
+    for (String s : vol.split(","))
+      volumes.add(s);
+
+    Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    scanner.setRange(tableRange);
+    scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    int fileCount = 0;
+    for (Entry<Key,Value> entry : scanner) {
+      boolean inVolume = false;
+      for (String volume : volumes) {
+        if (entry.getKey().getColumnQualifier().toString().contains(volume))
+          inVolume = true;
+      }
+      assertTrue("Data not written to the correct volumes", inVolume);
+      fileCount++;
+    }
+    assertEquals("Wrong number of files", 11, fileCount);
+  }
+
+  // Test that uses two tables with 10 split points each. They each use the PreferredVolumeChooser to choose volumes.
+  @Test
+  public void twoTablesPreferredVolumeChooser() throws Exception {
+    log.info("Starting twoTablesPreferredVolumeChooser");
+
+    // Create namespace
+    Connector connector = getConnector();
+    connector.namespaceOperations().create(namespace1);
+
+    // Set properties on the namespace
+    String propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+    String volume = PreferredVolumeChooser.class.getName();
+    connector.namespaceOperations().setProperty(namespace1, propertyName, volume);
+
+    propertyName = "table.custom.preferredVolumes";
+    volume = v2.toString();
+    connector.namespaceOperations().setProperty(namespace1, propertyName, volume);
+
+    // Create table1 on namespace1
+    String tableName = namespace1 + ".1";
+    connector.tableOperations().create(tableName);
+    String tableID = connector.tableOperations().tableIdMap().get(tableName);
+
+    // Add 10 splits to the table
+    addSplits(connector, tableName);
+    // Write some data to the table
+    writeAndReadData(connector, tableName);
+    // Verify the new files are written to the Volumes specified
+    verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), volume);
+
+    connector.namespaceOperations().create(namespace2);
+
+    // Set properties on the namespace
+    propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+    volume = PreferredVolumeChooser.class.getName();
+    connector.namespaceOperations().setProperty(namespace2, propertyName, volume);
+
+    propertyName = "table.custom.preferredVolumes";
+    volume = v1.toString();
+    connector.namespaceOperations().setProperty(namespace2, propertyName, volume);
+
+    // Create table2 on namespace2
+    String tableName2 = namespace2 + ".1";
+
+    connector.tableOperations().create(tableName2);
+    String tableID2 = connector.tableOperations().tableIdMap().get(tableName2);
+
+    // Add 10 splits to the table
+    addSplits(connector, tableName2);
+    // Write some data to the table
+    writeAndReadData(connector, tableName2);
+    // Verify the new files are written to the Volumes specified
+    verifyVolumes(connector, tableName2, TabletsSection.getRange(tableID2), volume);
+  }
+
+  // Test that uses two tables with 10 split points each. They each use the RandomVolumeChooser to choose volumes.
+  @Test
+  public void twoTablesRandomVolumeChooser() throws Exception {
+    log.info("Starting twoTablesRandomVolumeChooser()");
+
+    // Create namespace
+    Connector connector = getConnector();
+    connector.namespaceOperations().create(namespace1);
+
+    // Set properties on the namespace
+    String propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+    String volume = RandomVolumeChooser.class.getName();
+    connector.namespaceOperations().setProperty(namespace1, propertyName, volume);
+
+    // Create table1 on namespace1
+    String tableName = namespace1 + ".1";
+    connector.tableOperations().create(tableName);
+    String tableID = connector.tableOperations().tableIdMap().get(tableName);
+
+    // Add 10 splits to the table
+    addSplits(connector, tableName);
+    // Write some data to the table
+    writeAndReadData(connector, tableName);
+    // Verify the new files are written to the Volumes specified
+
+    verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), v1.toString() + "," + v2.toString() + "," + v4.toString());
+
+    connector.namespaceOperations().create(namespace2);
+
+    // Set properties on the namespace
+    propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+    volume = RandomVolumeChooser.class.getName();
+    connector.namespaceOperations().setProperty(namespace2, propertyName, volume);
+
+    // Create table2 on namespace2
+    String tableName2 = namespace2 + ".1";
+    connector.tableOperations().create(tableName2);
+    String tableID2 = connector.tableOperations().tableIdMap().get(tableName);
+
+    // / Add 10 splits to the table
+    addSplits(connector, tableName2);
+    // Write some data to the table
+    writeAndReadData(connector, tableName2);
+    // Verify the new files are written to the Volumes specified
+    verifyVolumes(connector, tableName2, TabletsSection.getRange(tableID2), v1.toString() + "," + v2.toString() + "," + v4.toString());
+  }
+
+  // Test that uses two tables with 10 split points each. The first uses the RandomVolumeChooser and the second uses the
+  // StaticVolumeChooser to choose volumes.
+  @Test
+  public void twoTablesDiffChoosers() throws Exception {
+    log.info("Starting twoTablesDiffChoosers");
+
+    // Create namespace
+    Connector connector = getConnector();
+    connector.namespaceOperations().create(namespace1);
+
+    // Set properties on the namespace
+    String propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+    String volume = RandomVolumeChooser.class.getName();
+    connector.namespaceOperations().setProperty(namespace1, propertyName, volume);
+
+    // Create table1 on namespace1
+    String tableName = namespace1 + ".1";
+    connector.tableOperations().create(tableName);
+    String tableID = connector.tableOperations().tableIdMap().get(tableName);
+
+    // Add 10 splits to the table
+    addSplits(connector, tableName);
+    // Write some data to the table
+    writeAndReadData(connector, tableName);
+    // Verify the new files are written to the Volumes specified
+
+    verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), v1.toString() + "," + v2.toString() + "," + v4.toString());
+
+    connector.namespaceOperations().create(namespace2);
+
+    // Set properties on the namespace
+    propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+    volume = PreferredVolumeChooser.class.getName();
+    connector.namespaceOperations().setProperty(namespace2, propertyName, volume);
+
+    propertyName = "table.custom.preferredVolumes";
+    volume = v1.toString();
+    connector.namespaceOperations().setProperty(namespace2, propertyName, volume);
+
+    // Create table2 on namespace2
+    String tableName2 = namespace2 + ".1";
+    connector.tableOperations().create(tableName2);
+    String tableID2 = connector.tableOperations().tableIdMap().get(tableName2);
+
+    // Add 10 splits to the table
+    addSplits(connector, tableName2);
+    // Write some data to the table
+    writeAndReadData(connector, tableName2);
+    // Verify the new files are written to the Volumes specified
+    verifyVolumes(connector, tableName2, TabletsSection.getRange(tableID2), volume);
+  }
+
+  // Test that uses one table with 10 split points each. It uses the StaticVolumeChooser, but no preferred volume is specified. This means that the volume
+  // is chosen randomly from all instance volumes.
+  @Test
+  public void missingVolumePreferredVolumeChooser() throws Exception {
+    log.info("Starting missingVolumePreferredVolumeChooser");
+
+    // Create namespace
+    Connector connector = getConnector();
+    connector.namespaceOperations().create(namespace1);
+
+    // Set properties on the namespace
+    String propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+    String volume = PreferredVolumeChooser.class.getName();
+    connector.namespaceOperations().setProperty(namespace1, propertyName, volume);
+
+    // Create table1 on namespace1
+    String tableName = namespace1 + ".1";
+    connector.tableOperations().create(tableName);
+    String tableID = connector.tableOperations().tableIdMap().get(tableName);
+
+    // Add 10 splits to the table
+    addSplits(connector, tableName);
+    // Write some data to the table
+    writeAndReadData(connector, tableName);
+    // Verify the new files are written to the Volumes specified
+    verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), v1.toString() + "," + v2.toString() + "," + v4.toString());
+  }
+
+  // Test that uses one table with 10 split points each. It uses the PreferredVolumeChooser, but preferred volume is not an instance volume. This means that the
+  // volume is chosen randomly from all instance volumes
+  @Test
+  public void notInstancePreferredVolumeChooser() throws Exception {
+    log.info("Starting notInstancePreferredVolumeChooser");
+
+    // Create namespace
+    Connector connector = getConnector();
+    connector.namespaceOperations().create(namespace1);
+
+    // Set properties on the namespace
+    String propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+    String volume = PreferredVolumeChooser.class.getName();
+    connector.namespaceOperations().setProperty(namespace1, propertyName, volume);
+
+    propertyName = "table.custom.preferredVolumes";
+    volume = v3.toString();
+    connector.namespaceOperations().setProperty(namespace1, propertyName, volume);
+
+    // Create table1 on namespace1
+    String tableName = namespace1 + ".1";
+    connector.tableOperations().create(tableName);
+    String tableID = connector.tableOperations().tableIdMap().get(tableName);
+
+    // Add 10 splits to the table
+    addSplits(connector, tableName);
+    // Write some data to the table
+    writeAndReadData(connector, tableName);
+    // Verify the new files are written to the Volumes specified
+    verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), v1.toString() + "," + v2.toString() + "," + v4.toString());
+  }
+
+  // Test that uses one table with 10 split points each. It does not specify a specific chooser, so the volume is chosen randomly from all instance volumes.
+  @Test
+  public void chooserNotSpecified() throws Exception {
+    log.info("Starting chooserNotSpecified");
+
+    // Create a table
+    Connector connector = getConnector();
+    String tableName = getUniqueNames(2)[0];
+    connector.tableOperations().create(tableName);
+    String tableID = connector.tableOperations().tableIdMap().get(tableName);
+
+    // Add 10 splits to the table
+    addSplits(connector, tableName);
+    // Write some data to the table
+    writeAndReadData(connector, tableName);
+
+    // Verify the new files are written to the Volumes specified
+    verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), v1.toString() + "," + v2.toString() + "," + v4.toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
new file mode 100644
index 0000000..c25370d
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.DiskUsage;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.init.Initialize;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
+import org.apache.accumulo.server.util.Admin;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class VolumeIT extends ConfigurableMacBase {
+
+  private static final Text EMPTY = new Text();
+  private static final Value EMPTY_VALUE = new Value(new byte[] {});
+  private File volDirBase;
+  private Path v1, v2;
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 5 * 60;
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    File baseDir = cfg.getDir();
+    volDirBase = new File(baseDir, "volumes");
+    File v1f = new File(volDirBase, "v1");
+    File v2f = new File(volDirBase, "v2");
+    v1 = new Path("file://" + v1f.getAbsolutePath());
+    v2 = new Path("file://" + v2f.getAbsolutePath());
+
+    // Run MAC on two locations in the local file system
+    URI v1Uri = v1.toUri();
+    cfg.setProperty(Property.INSTANCE_DFS_DIR, v1Uri.getPath());
+    cfg.setProperty(Property.INSTANCE_DFS_URI, v1Uri.getScheme() + v1Uri.getHost());
+    cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString());
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+
+    // use raw local file system so walogs sync and flush will work
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+
+    super.configure(cfg, hadoopCoreSite);
+  }
+
+  @Test
+  public void test() throws Exception {
+    // create a table
+    Connector connector = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    connector.tableOperations().create(tableName);
+    SortedSet<Text> partitions = new TreeSet<Text>();
+    // with some splits
+    for (String s : "d,m,t".split(","))
+      partitions.add(new Text(s));
+    connector.tableOperations().addSplits(tableName, partitions);
+    // scribble over the splits
+    BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
+    String[] rows = "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(",");
+    for (String s : rows) {
+      Mutation m = new Mutation(new Text(s));
+      m.put(EMPTY, EMPTY, EMPTY_VALUE);
+      bw.addMutation(m);
+    }
+    bw.close();
+    // write the data to disk, read it back
+    connector.tableOperations().flush(tableName, null, null, true);
+    Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+    int i = 0;
+    for (Entry<Key,Value> entry : scanner) {
+      assertEquals(rows[i++], entry.getKey().getRow().toString());
+    }
+    // verify the new files are written to the different volumes
+    scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    scanner.setRange(new Range("1", "1<"));
+    scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    int fileCount = 0;
+
+    for (Entry<Key,Value> entry : scanner) {
+      boolean inV1 = entry.getKey().getColumnQualifier().toString().contains(v1.toString());
+      boolean inV2 = entry.getKey().getColumnQualifier().toString().contains(v2.toString());
+      assertTrue(inV1 || inV2);
+      fileCount++;
+    }
+    assertEquals(4, fileCount);
+    List<DiskUsage> diskUsage = connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
+    assertEquals(1, diskUsage.size());
+    long usage = diskUsage.get(0).getUsage().longValue();
+    System.out.println("usage " + usage);
+    assertTrue(usage > 700 && usage < 800);
+  }
+
+  private void verifyData(List<String> expected, Scanner createScanner) {
+
+    List<String> actual = new ArrayList<String>();
+
+    for (Entry<Key,Value> entry : createScanner) {
+      Key k = entry.getKey();
+      actual.add(k.getRow() + ":" + k.getColumnFamily() + ":" + k.getColumnQualifier() + ":" + entry.getValue());
+    }
+
+    Collections.sort(expected);
+    Collections.sort(actual);
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testRelativePaths() throws Exception {
+
+    List<String> expected = new ArrayList<String>();
+
+    Connector connector = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    connector.tableOperations().create(tableName, new NewTableConfiguration().withoutDefaultIterators());
+
+    String tableId = connector.tableOperations().tableIdMap().get(tableName);
+
+    SortedSet<Text> partitions = new TreeSet<Text>();
+    // with some splits
+    for (String s : "c,g,k,p,s,v".split(","))
+      partitions.add(new Text(s));
+
+    connector.tableOperations().addSplits(tableName, partitions);
+
+    BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
+
+    // create two files in each tablet
+
+    String[] rows = "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(",");
+    for (String s : rows) {
+      Mutation m = new Mutation(s);
+      m.put("cf1", "cq1", "1");
+      bw.addMutation(m);
+      expected.add(s + ":cf1:cq1:1");
+    }
+
+    bw.flush();
+    connector.tableOperations().flush(tableName, null, null, true);
+
+    for (String s : rows) {
+      Mutation m = new Mutation(s);
+      m.put("cf1", "cq1", "2");
+      bw.addMutation(m);
+      expected.add(s + ":cf1:cq1:2");
+    }
+
+    bw.close();
+    connector.tableOperations().flush(tableName, null, null, true);
+
+    verifyData(expected, connector.createScanner(tableName, Authorizations.EMPTY));
+
+    connector.tableOperations().offline(tableName, true);
+
+    connector.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+
+    Scanner metaScanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    metaScanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+    metaScanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+
+    BatchWriter mbw = connector.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+
+    for (Entry<Key,Value> entry : metaScanner) {
+      String cq = entry.getKey().getColumnQualifier().toString();
+      if (cq.startsWith(v1.toString())) {
+        Path path = new Path(cq);
+        String relPath = "/" + path.getParent().getName() + "/" + path.getName();
+        Mutation fileMut = new Mutation(entry.getKey().getRow());
+        fileMut.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
+        fileMut.put(entry.getKey().getColumnFamily().toString(), relPath, entry.getValue().toString());
+        mbw.addMutation(fileMut);
+      }
+    }
+
+    mbw.close();
+
+    connector.tableOperations().online(tableName, true);
+
+    verifyData(expected, connector.createScanner(tableName, Authorizations.EMPTY));
+
+    connector.tableOperations().compact(tableName, null, null, true, true);
+
+    verifyData(expected, connector.createScanner(tableName, Authorizations.EMPTY));
+
+    for (Entry<Key,Value> entry : metaScanner) {
+      String cq = entry.getKey().getColumnQualifier().toString();
+      Path path = new Path(cq);
+      Assert.assertTrue("relative path not deleted " + path.toString(), path.depth() > 2);
+    }
+
+  }
+
+  @Test
+  public void testAddVolumes() throws Exception {
+
+    String[] tableNames = getUniqueNames(2);
+
+    // grab this before shutting down cluster
+    String uuid = new ZooKeeperInstance(cluster.getClientConfig()).getInstanceID();
+
+    verifyVolumesUsed(tableNames[0], false, v1, v2);
+
+    Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+    cluster.stop();
+
+    Configuration conf = new Configuration(false);
+    conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml"));
+
+    File v3f = new File(volDirBase, "v3");
+    assertTrue(v3f.mkdir() || v3f.isDirectory());
+    Path v3 = new Path("file://" + v3f.getAbsolutePath());
+
+    conf.set(Property.INSTANCE_VOLUMES.getKey(), v1.toString() + "," + v2.toString() + "," + v3.toString());
+    BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml")));
+    conf.writeXml(fos);
+    fos.close();
+
+    // initialize volume
+    Assert.assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").waitFor());
+
+    // check that all volumes are initialized
+    for (Path volumePath : Arrays.asList(v1, v2, v3)) {
+      FileSystem fs = volumePath.getFileSystem(CachedConfiguration.getInstance());
+      Path vp = new Path(volumePath, ServerConstants.INSTANCE_ID_DIR);
+      FileStatus[] iids = fs.listStatus(vp);
+      Assert.assertEquals(1, iids.length);
+      Assert.assertEquals(uuid, iids[0].getPath().getName());
+    }
+
+    // start cluster and verify that new volume is used
+    cluster.start();
+
+    verifyVolumesUsed(tableNames[1], false, v1, v2, v3);
+  }
+
+  @Test
+  public void testNonConfiguredVolumes() throws Exception {
+
+    String[] tableNames = getUniqueNames(2);
+
+    // grab this before shutting down cluster
+    String uuid = new ZooKeeperInstance(cluster.getClientConfig()).getInstanceID();
+
+    verifyVolumesUsed(tableNames[0], false, v1, v2);
+
+    Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+    cluster.stop();
+
+    Configuration conf = new Configuration(false);
+    conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml"));
+
+    File v3f = new File(volDirBase, "v3");
+    assertTrue(v3f.mkdir() || v3f.isDirectory());
+    Path v3 = new Path("file://" + v3f.getAbsolutePath());
+
+    conf.set(Property.INSTANCE_VOLUMES.getKey(), v2.toString() + "," + v3.toString());
+    BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml")));
+    conf.writeXml(fos);
+    fos.close();
+
+    // initialize volume
+    Assert.assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").waitFor());
+
+    // check that all volumes are initialized
+    for (Path volumePath : Arrays.asList(v1, v2, v3)) {
+      FileSystem fs = volumePath.getFileSystem(CachedConfiguration.getInstance());
+      Path vp = new Path(volumePath, ServerConstants.INSTANCE_ID_DIR);
+      FileStatus[] iids = fs.listStatus(vp);
+      Assert.assertEquals(1, iids.length);
+      Assert.assertEquals(uuid, iids[0].getPath().getName());
+    }
+
+    // start cluster and verify that new volume is used
+    cluster.start();
+
+    // Make sure we can still read the tables (tableNames[0] is very likely to have a file still on v1)
+    List<String> expected = new ArrayList<String>();
+    for (int i = 0; i < 100; i++) {
+      String row = String.format("%06d", i * 100 + 3);
+      expected.add(row + ":cf1:cq1:1");
+    }
+
+    verifyData(expected, getConnector().createScanner(tableNames[0], Authorizations.EMPTY));
+
+    // v1 should not have any data for tableNames[1]
+    verifyVolumesUsed(tableNames[1], false, v2, v3);
+  }
+
+  private void writeData(String tableName, Connector conn) throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException,
+      MutationsRejectedException {
+    TreeSet<Text> splits = new TreeSet<Text>();
+    for (int i = 1; i < 100; i++) {
+      splits.add(new Text(String.format("%06d", i * 100)));
+    }
+
+    conn.tableOperations().create(tableName);
+    conn.tableOperations().addSplits(tableName, splits);
+
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      String row = String.format("%06d", i * 100 + 3);
+      Mutation m = new Mutation(row);
+      m.put("cf1", "cq1", "1");
+      bw.addMutation(m);
+    }
+
+    bw.close();
+  }
+
+  private void verifyVolumesUsed(String tableName, boolean shouldExist, Path... paths) throws Exception {
+
+    Connector conn = getConnector();
+
+    List<String> expected = new ArrayList<String>();
+    for (int i = 0; i < 100; i++) {
+      String row = String.format("%06d", i * 100 + 3);
+      expected.add(row + ":cf1:cq1:1");
+    }
+
+    if (!conn.tableOperations().exists(tableName)) {
+      Assert.assertFalse(shouldExist);
+
+      writeData(tableName, conn);
+
+      verifyData(expected, conn.createScanner(tableName, Authorizations.EMPTY));
+
+      conn.tableOperations().flush(tableName, null, null, true);
+    }
+
+    verifyData(expected, conn.createScanner(tableName, Authorizations.EMPTY));
+
+    String tableId = conn.tableOperations().tableIdMap().get(tableName);
+    Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(metaScanner);
+    metaScanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+    metaScanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+
+    int counts[] = new int[paths.length];
+
+    outer: for (Entry<Key,Value> entry : metaScanner) {
+      String cf = entry.getKey().getColumnFamily().toString();
+      String cq = entry.getKey().getColumnQualifier().toString();
+
+      String path;
+      if (cf.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME.toString()))
+        path = cq;
+      else
+        path = entry.getValue().toString();
+
+      for (int i = 0; i < paths.length; i++) {
+        if (path.startsWith(paths[i].toString())) {
+          counts[i]++;
+          continue outer;
+        }
+      }
+
+      Assert.fail("Unexpected volume " + path);
+    }
+
+    Instance i = conn.getInstance();
+    ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(), "");
+    WalStateManager wals = new WalStateManager(i, zk);
+    outer: for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+      for (Path path : paths) {
+        if (entry.getKey().toString().startsWith(path.toString())) {
+          continue outer;
+        }
+      }
+      Assert.fail("Unexpected volume " + entry.getKey());
+    }
+
+    // if a volume is chosen randomly for each tablet, then the probability that a volume will not be chosen for any tablet is ((num_volumes -
+    // 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the probability that only 2 volumes would be chosen is 2.46e-18
+
+    int sum = 0;
+    for (int count : counts) {
+      Assert.assertTrue(count > 0);
+      sum += count;
+    }
+
+    Assert.assertEquals(200, sum);
+
+  }
+
+  @Test
+  public void testRemoveVolumes() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+
+    verifyVolumesUsed(tableNames[0], false, v1, v2);
+
+    Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+    cluster.stop();
+
+    Configuration conf = new Configuration(false);
+    conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml"));
+
+    conf.set(Property.INSTANCE_VOLUMES.getKey(), v2.toString());
+    BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml")));
+    conf.writeXml(fos);
+    fos.close();
+
+    // start cluster and verify that volume was decommisioned
+    cluster.start();
+
+    Connector conn = cluster.getConnector("root", new PasswordToken(ROOT_PASSWORD));
+    conn.tableOperations().compact(tableNames[0], null, null, true, true);
+
+    verifyVolumesUsed(tableNames[0], true, v2);
+
+    // check that root tablet is not on volume 1
+    ZooReader zreader = new ZooReader(cluster.getZooKeepers(), 30000);
+    String zpath = ZooUtil.getRoot(new ZooKeeperInstance(cluster.getClientConfig())) + RootTable.ZROOT_TABLET_PATH;
+    String rootTabletDir = new String(zreader.getData(zpath, false, null), UTF_8);
+    Assert.assertTrue(rootTabletDir.startsWith(v2.toString()));
+
+    conn.tableOperations().clone(tableNames[0], tableNames[1], true, new HashMap<String,String>(), new HashSet<String>());
+
+    conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
+    conn.tableOperations().flush(RootTable.NAME, null, null, true);
+
+    verifyVolumesUsed(tableNames[0], true, v2);
+    verifyVolumesUsed(tableNames[1], true, v2);
+
+  }
+
+  private void testReplaceVolume(boolean cleanShutdown) throws Exception {
+    String[] tableNames = getUniqueNames(3);
+
+    verifyVolumesUsed(tableNames[0], false, v1, v2);
+
+    // write to 2nd table, but do not flush data to disk before shutdown
+    writeData(tableNames[1], cluster.getConnector("root", new PasswordToken(ROOT_PASSWORD)));
+
+    if (cleanShutdown)
+      Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+
+    cluster.stop();
+
+    File v1f = new File(v1.toUri());
+    File v8f = new File(new File(v1.getParent().toUri()), "v8");
+    Assert.assertTrue("Failed to rename " + v1f + " to " + v8f, v1f.renameTo(v8f));
+    Path v8 = new Path(v8f.toURI());
+
+    File v2f = new File(v2.toUri());
+    File v9f = new File(new File(v2.getParent().toUri()), "v9");
+    Assert.assertTrue("Failed to rename " + v2f + " to " + v9f, v2f.renameTo(v9f));
+    Path v9 = new Path(v9f.toURI());
+
+    Configuration conf = new Configuration(false);
+    conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml"));
+
+    conf.set(Property.INSTANCE_VOLUMES.getKey(), v8 + "," + v9);
+    conf.set(Property.INSTANCE_VOLUMES_REPLACEMENTS.getKey(), v1 + " " + v8 + "," + v2 + " " + v9);
+    BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml")));
+    conf.writeXml(fos);
+    fos.close();
+
+    // start cluster and verify that volumes were replaced
+    cluster.start();
+
+    verifyVolumesUsed(tableNames[0], true, v8, v9);
+    verifyVolumesUsed(tableNames[1], true, v8, v9);
+
+    // verify writes to new dir
+    getConnector().tableOperations().compact(tableNames[0], null, null, true, true);
+    getConnector().tableOperations().compact(tableNames[1], null, null, true, true);
+
+    verifyVolumesUsed(tableNames[0], true, v8, v9);
+    verifyVolumesUsed(tableNames[1], true, v8, v9);
+
+    // check that root tablet is not on volume 1 or 2
+    ZooReader zreader = new ZooReader(cluster.getZooKeepers(), 30000);
+    String zpath = ZooUtil.getRoot(new ZooKeeperInstance(cluster.getClientConfig())) + RootTable.ZROOT_TABLET_PATH;
+    String rootTabletDir = new String(zreader.getData(zpath, false, null), UTF_8);
+    Assert.assertTrue(rootTabletDir.startsWith(v8.toString()) || rootTabletDir.startsWith(v9.toString()));
+
+    getConnector().tableOperations().clone(tableNames[1], tableNames[2], true, new HashMap<String,String>(), new HashSet<String>());
+
+    getConnector().tableOperations().flush(MetadataTable.NAME, null, null, true);
+    getConnector().tableOperations().flush(RootTable.NAME, null, null, true);
+
+    verifyVolumesUsed(tableNames[0], true, v8, v9);
+    verifyVolumesUsed(tableNames[1], true, v8, v9);
+    verifyVolumesUsed(tableNames[2], true, v8, v9);
+  }
+
+  @Test
+  public void testCleanReplaceVolumes() throws Exception {
+    testReplaceVolume(true);
+  }
+
+  @Test
+  public void testDirtyReplaceVolumes() throws Exception {
+    testReplaceVolume(false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java b/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java
new file mode 100644
index 0000000..249bf14
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class WaitForBalanceIT extends ConfigurableMacBase {
+
+  @Override
+  public int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    final Connector c = getConnector();
+    // ensure the metadata table is online
+    Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+    c.instanceOperations().waitForBalance();
+    assertTrue(isBalanced());
+    final String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    c.instanceOperations().waitForBalance();
+    final SortedSet<Text> partitionKeys = new TreeSet<Text>();
+    for (int i = 0; i < 1000; i++) {
+      partitionKeys.add(new Text("" + i));
+    }
+    c.tableOperations().addSplits(tableName, partitionKeys);
+    assertFalse(isBalanced());
+    c.instanceOperations().waitForBalance();
+    assertTrue(isBalanced());
+  }
+
+  private boolean isBalanced() throws Exception {
+    final Map<String,Integer> counts = new HashMap<String,Integer>();
+    int offline = 0;
+    final Connector c = getConnector();
+    for (String tableName : new String[] {MetadataTable.NAME, RootTable.NAME}) {
+      final Scanner s = c.createScanner(tableName, Authorizations.EMPTY);
+      s.setRange(MetadataSchema.TabletsSection.getRange());
+      s.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
+      MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(s);
+      String location = null;
+      for (Entry<Key,Value> entry : s) {
+        Key key = entry.getKey();
+        if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)) {
+          location = key.getColumnQualifier().toString();
+        } else if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
+          if (location == null) {
+            offline++;
+          } else {
+            Integer count = counts.get(location);
+            if (count == null)
+              count = new Integer(0);
+            count = new Integer(count.intValue() + 1);
+            counts.put(location, count);
+          }
+          location = null;
+        }
+      }
+    }
+    // the replication table is expected to be offline for this test, so ignore it
+    if (offline > 1) {
+      System.out.println("Offline tablets " + offline);
+      return false;
+    }
+    int average = 0;
+    for (Integer i : counts.values()) {
+      average += i;
+    }
+    average /= counts.size();
+    System.out.println(counts);
+    int tablesCount = c.tableOperations().list().size();
+    for (Entry<String,Integer> hostCount : counts.entrySet()) {
+      if (Math.abs(average - hostCount.getValue()) > tablesCount) {
+        System.out.println("Average " + average + " count " + hostCount.getKey() + ": " + hostCount.getValue());
+        return false;
+      }
+    }
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
new file mode 100644
index 0000000..118f053
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.lang.System.currentTimeMillis;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AccumuloInputFormatIT extends AccumuloClusterHarness {
+
+  AccumuloInputFormat inputFormat;
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setNumTservers(1);
+  }
+
+  @Before
+  public void before() {
+    inputFormat = new AccumuloInputFormat();
+  }
+
+  /**
+   * Tests several different paths through the getSplits() method by setting different properties and verifying the results.
+   */
+  @Test
+  public void testGetSplits() throws Exception {
+    Connector conn = getConnector();
+    String table = getUniqueNames(1)[0];
+    conn.tableOperations().create(table);
+    insertData(table, currentTimeMillis());
+
+    ClientConfiguration clientConf = cluster.getClientConfig();
+    AccumuloConfiguration clusterClientConf = new ConfigurationCopy(new DefaultConfiguration());
+
+    // Pass SSL and CredentialProvider options into the ClientConfiguration given to AccumuloInputFormat
+    boolean sslEnabled = Boolean.valueOf(clusterClientConf.get(Property.INSTANCE_RPC_SSL_ENABLED));
+    if (sslEnabled) {
+      ClientProperty[] sslProperties = new ClientProperty[] {ClientProperty.INSTANCE_RPC_SSL_ENABLED, ClientProperty.INSTANCE_RPC_SSL_CLIENT_AUTH,
+          ClientProperty.RPC_SSL_KEYSTORE_PATH, ClientProperty.RPC_SSL_KEYSTORE_TYPE, ClientProperty.RPC_SSL_KEYSTORE_PASSWORD,
+          ClientProperty.RPC_SSL_TRUSTSTORE_PATH, ClientProperty.RPC_SSL_TRUSTSTORE_TYPE, ClientProperty.RPC_SSL_TRUSTSTORE_PASSWORD,
+          ClientProperty.RPC_USE_JSSE, ClientProperty.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS};
+
+      for (ClientProperty prop : sslProperties) {
+        // The default property is returned if it's not in the ClientConfiguration so we don't have to check if the value is actually defined
+        clientConf.setProperty(prop, clusterClientConf.get(prop.getKey()));
+      }
+    }
+
+    Job job = Job.getInstance();
+    AccumuloInputFormat.setInputTableName(job, table);
+    AccumuloInputFormat.setZooKeeperInstance(job, clientConf);
+    AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+
+    // split table
+    TreeSet<Text> splitsToAdd = new TreeSet<Text>();
+    for (int i = 0; i < 10000; i += 1000)
+      splitsToAdd.add(new Text(String.format("%09d", i)));
+    conn.tableOperations().addSplits(table, splitsToAdd);
+    UtilWaitThread.sleep(500); // wait for splits to be propagated
+
+    // get splits without setting any range
+    Collection<Text> actualSplits = conn.tableOperations().listSplits(table);
+    List<InputSplit> splits = inputFormat.getSplits(job);
+    assertEquals(actualSplits.size() + 1, splits.size()); // No ranges set on the job so it'll start with -inf
+
+    // set ranges and get splits
+    List<Range> ranges = new ArrayList<Range>();
+    for (Text text : actualSplits)
+      ranges.add(new Range(text));
+    AccumuloInputFormat.setRanges(job, ranges);
+    splits = inputFormat.getSplits(job);
+    assertEquals(actualSplits.size(), splits.size());
+
+    // offline mode
+    AccumuloInputFormat.setOfflineTableScan(job, true);
+    try {
+      inputFormat.getSplits(job);
+      fail("An exception should have been thrown");
+    } catch (IOException e) {}
+
+    conn.tableOperations().offline(table, true);
+    splits = inputFormat.getSplits(job);
+    assertEquals(actualSplits.size(), splits.size());
+
+    // auto adjust ranges
+    ranges = new ArrayList<Range>();
+    for (int i = 0; i < 5; i++)
+      // overlapping ranges
+      ranges.add(new Range(String.format("%09d", i), String.format("%09d", i + 2)));
+    AccumuloInputFormat.setRanges(job, ranges);
+    splits = inputFormat.getSplits(job);
+    assertEquals(2, splits.size());
+
+    AccumuloInputFormat.setAutoAdjustRanges(job, false);
+    splits = inputFormat.getSplits(job);
+    assertEquals(ranges.size(), splits.size());
+
+    // BatchScan not available for offline scans
+    AccumuloInputFormat.setBatchScan(job, true);
+    // Reset auto-adjust ranges too
+    AccumuloInputFormat.setAutoAdjustRanges(job, true);
+
+    AccumuloInputFormat.setOfflineTableScan(job, true);
+    try {
+      inputFormat.getSplits(job);
+      fail("An exception should have been thrown");
+    } catch (IllegalArgumentException e) {}
+
+    conn.tableOperations().online(table, true);
+    AccumuloInputFormat.setOfflineTableScan(job, false);
+
+    // test for resumption of success
+    splits = inputFormat.getSplits(job);
+    assertEquals(2, splits.size());
+
+    // BatchScan not available with isolated iterators
+    AccumuloInputFormat.setScanIsolation(job, true);
+    try {
+      inputFormat.getSplits(job);
+      fail("An exception should have been thrown");
+    } catch (IllegalArgumentException e) {}
+    AccumuloInputFormat.setScanIsolation(job, false);
+
+    // test for resumption of success
+    splits = inputFormat.getSplits(job);
+    assertEquals(2, splits.size());
+
+    // BatchScan not available with local iterators
+    AccumuloInputFormat.setLocalIterators(job, true);
+    try {
+      inputFormat.getSplits(job);
+      fail("An exception should have been thrown");
+    } catch (IllegalArgumentException e) {}
+    AccumuloInputFormat.setLocalIterators(job, false);
+
+    // Check we are getting back correct type pf split
+    conn.tableOperations().online(table);
+    splits = inputFormat.getSplits(job);
+    for (InputSplit split : splits)
+      assert (split instanceof BatchInputSplit);
+
+    // We should divide along the tablet lines similar to when using `setAutoAdjustRanges(job, true)`
+    assertEquals(2, splits.size());
+  }
+
+  private void insertData(String tableName, long ts) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    BatchWriter bw = getConnector().createBatchWriter(tableName, null);
+
+    for (int i = 0; i < 10000; i++) {
+      String row = String.format("%09d", i);
+
+      Mutation m = new Mutation(new Text(row));
+      m.put(new Text("cf1"), new Text("cq1"), ts, new Value(("" + i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java
new file mode 100644
index 0000000..4b4aeac
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class AddSplitIT extends AccumuloClusterHarness {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void addSplitTest() throws Exception {
+
+    String tableName = getUniqueNames(1)[0];
+    Connector c = getConnector();
+    c.tableOperations().create(tableName);
+
+    insertData(tableName, 1l);
+
+    TreeSet<Text> splits = new TreeSet<Text>();
+    splits.add(new Text(String.format("%09d", 333)));
+    splits.add(new Text(String.format("%09d", 666)));
+
+    c.tableOperations().addSplits(tableName, splits);
+
+    UtilWaitThread.sleep(100);
+
+    Collection<Text> actualSplits = c.tableOperations().listSplits(tableName);
+
+    if (!splits.equals(new TreeSet<Text>(actualSplits))) {
+      throw new Exception(splits + " != " + actualSplits);
+    }
+
+    verifyData(tableName, 1l);
+    insertData(tableName, 2l);
+
+    // did not clear splits on purpose, it should ignore existing split points
+    // and still create the three additional split points
+
+    splits.add(new Text(String.format("%09d", 200)));
+    splits.add(new Text(String.format("%09d", 500)));
+    splits.add(new Text(String.format("%09d", 800)));
+
+    c.tableOperations().addSplits(tableName, splits);
+
+    UtilWaitThread.sleep(100);
+
+    actualSplits = c.tableOperations().listSplits(tableName);
+
+    if (!splits.equals(new TreeSet<Text>(actualSplits))) {
+      throw new Exception(splits + " != " + actualSplits);
+    }
+
+    verifyData(tableName, 2l);
+  }
+
+  private void verifyData(String tableName, long ts) throws Exception {
+    Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY);
+
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+
+    for (int i = 0; i < 10000; i++) {
+      if (!iter.hasNext()) {
+        throw new Exception("row " + i + " not found");
+      }
+
+      Entry<Key,Value> entry = iter.next();
+
+      String row = String.format("%09d", i);
+
+      if (!entry.getKey().getRow().equals(new Text(row))) {
+        throw new Exception("unexpected row " + entry.getKey() + " " + i);
+      }
+
+      if (entry.getKey().getTimestamp() != ts) {
+        throw new Exception("unexpected ts " + entry.getKey() + " " + ts);
+      }
+
+      if (Integer.parseInt(entry.getValue().toString()) != i) {
+        throw new Exception("unexpected value " + entry + " " + i);
+      }
+    }
+
+    if (iter.hasNext()) {
+      throw new Exception("found more than expected " + iter.next());
+    }
+
+  }
+
+  private void insertData(String tableName, long ts) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException {
+    BatchWriter bw = getConnector().createBatchWriter(tableName, null);
+
+    for (int i = 0; i < 10000; i++) {
+      String row = String.format("%09d", i);
+
+      Mutation m = new Mutation(new Text(row));
+      m.put(new Text("cf1"), new Text("cq1"), ts, new Value(Integer.toString(i).getBytes(UTF_8)));
+      bw.addMutation(m);
+    }
+
+    bw.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BackupMasterIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BackupMasterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BackupMasterIT.java
new file mode 100644
index 0000000..d8979db
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BackupMasterIT.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.master.Master;
+import org.junit.Test;
+
+public class BackupMasterIT extends ConfigurableMacBase {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 120;
+  }
+
+  @Test
+  public void test() throws Exception {
+    // wait for master
+    UtilWaitThread.sleep(1000);
+    // create a backup
+    Process backup = exec(Master.class);
+    try {
+      ZooReaderWriter writer = new ZooReaderWriter(cluster.getZooKeepers(), 30 * 1000, "digest", "accumulo:DONTTELL".getBytes());
+      String root = "/accumulo/" + getConnector().getInstance().getInstanceID();
+      List<String> children = Collections.emptyList();
+      // wait for 2 lock entries
+      do {
+        UtilWaitThread.sleep(100);
+        children = writer.getChildren(root + "/masters/lock");
+      } while (children.size() != 2);
+      Collections.sort(children);
+      // wait for the backup master to learn to be the backup
+      UtilWaitThread.sleep(1000);
+      // generate a false zookeeper event
+      String lockPath = root + "/masters/lock/" + children.get(0);
+      byte data[] = writer.getData(lockPath, null);
+      writer.getZooKeeper().setData(lockPath, data, -1);
+      // let it propagate
+      UtilWaitThread.sleep(500);
+      // kill the master by removing its lock
+      writer.recursiveDelete(lockPath, NodeMissingPolicy.FAIL);
+      // ensure the backup becomes the master
+      getConnector().tableOperations().create(getUniqueNames(1)[0]);
+    } finally {
+      backup.destroy();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java
new file mode 100644
index 0000000..4c6fc00
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+import java.util.EnumSet;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class BadIteratorMincIT extends AccumuloClusterHarness {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    Connector c = getConnector();
+
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    IteratorSetting is = new IteratorSetting(30, BadIterator.class);
+    c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.minc));
+    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+
+    Mutation m = new Mutation(new Text("r1"));
+    m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(UTF_8)));
+
+    bw.addMutation(m);
+    bw.close();
+
+    c.tableOperations().flush(tableName, null, null, false);
+    UtilWaitThread.sleep(1000);
+
+    // minc should fail, so there should be no files
+    FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 0, 0);
+
+    // try to scan table
+    Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
+    int count = Iterators.size(scanner.iterator());
+    assertEquals("Did not see expected # entries " + count, 1, count);
+
+    // remove the bad iterator
+    c.tableOperations().removeIterator(tableName, BadIterator.class.getSimpleName(), EnumSet.of(IteratorScope.minc));
+
+    UtilWaitThread.sleep(5000);
+
+    // minc should complete
+    FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 1, 1);
+
+    count = Iterators.size(scanner.iterator());
+
+    if (count != 1)
+      throw new Exception("Did not see expected # entries " + count);
+
+    // now try putting bad iterator back and deleting the table
+    c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.minc));
+    bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+    m = new Mutation(new Text("r2"));
+    m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(UTF_8)));
+    bw.addMutation(m);
+    bw.close();
+
+    // make sure property is given time to propagate
+    UtilWaitThread.sleep(500);
+
+    c.tableOperations().flush(tableName, null, null, false);
+
+    // make sure the flush has time to start
+    UtilWaitThread.sleep(1000);
+
+    // this should not hang
+    c.tableOperations().delete(tableName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
new file mode 100644
index 0000000..ae470f6
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+public class BalanceAfterCommsFailureIT extends ConfigurableMacBase {
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(Property.GENERAL_RPC_TIMEOUT, "2s");
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    Connector c = this.getConnector();
+    c.tableOperations().create("test");
+    Collection<ProcessReference> tservers = getCluster().getProcesses().get(ServerType.TABLET_SERVER);
+    ArrayList<Integer> tserverPids = new ArrayList<Integer>(tservers.size());
+    for (ProcessReference tserver : tservers) {
+      Process p = tserver.getProcess();
+      if (!p.getClass().getName().equals("java.lang.UNIXProcess")) {
+        log.info("Found process that was not UNIXProcess, exiting test");
+        return;
+      }
+
+      Field f = p.getClass().getDeclaredField("pid");
+      f.setAccessible(true);
+      tserverPids.add(f.getInt(p));
+    }
+
+    for (int pid : tserverPids) {
+      assertEquals(0, Runtime.getRuntime().exec(new String[] {"kill", "-SIGSTOP", Integer.toString(pid)}).waitFor());
+    }
+    UtilWaitThread.sleep(20 * 1000);
+    for (int pid : tserverPids) {
+      assertEquals(0, Runtime.getRuntime().exec(new String[] {"kill", "-SIGCONT", Integer.toString(pid)}).waitFor());
+    }
+    SortedSet<Text> splits = new TreeSet<Text>();
+    for (String split : "a b c d e f g h i j k l m n o p q r s t u v w x y z".split(" ")) {
+      splits.add(new Text(split));
+    }
+    c.tableOperations().addSplits("test", splits);
+    // Ensure all of the tablets are actually assigned
+    assertEquals(0, Iterables.size(c.createScanner("test", Authorizations.EMPTY)));
+    UtilWaitThread.sleep(30 * 1000);
+    checkBalance(c);
+  }
+
+  private void checkBalance(Connector c) throws Exception {
+    Credentials creds = new Credentials("root", new PasswordToken(ROOT_PASSWORD));
+    ClientContext context = new ClientContext(c.getInstance(), creds, getClientConfig());
+
+    MasterMonitorInfo stats = null;
+    int unassignedTablets = 1;
+    for (int i = 0; unassignedTablets > 0 && i < 10; i++) {
+      MasterClientService.Iface client = null;
+      try {
+        client = MasterClient.getConnectionWithRetry(context);
+        stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
+      } finally {
+        if (client != null)
+          MasterClient.close(client);
+      }
+      unassignedTablets = stats.getUnassignedTablets();
+      if (unassignedTablets > 0) {
+        log.info("Found " + unassignedTablets + " unassigned tablets, sleeping 3 seconds for tablet assignment");
+        Thread.sleep(3000);
+      }
+    }
+
+    assertEquals("Unassigned tablets were not assigned within 30 seconds", 0, unassignedTablets);
+
+    List<Integer> counts = new ArrayList<Integer>();
+    for (TabletServerStatus server : stats.tServerInfo) {
+      int count = 0;
+      for (TableInfo table : server.tableMap.values()) {
+        count += table.onlineTablets;
+      }
+      counts.add(count);
+    }
+    assertTrue("Expected to have at least two TabletServers", counts.size() > 1);
+    for (int i = 1; i < counts.size(); i++) {
+      int diff = Math.abs(counts.get(0) - counts.get(i));
+      assertTrue("Expected difference in tablets to be less than or equal to " + counts.size() + " but was " + diff + ". Counts " + counts,
+          diff <= counts.size());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
new file mode 100644
index 0000000..623d79b
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Start a new table, create many splits, and offline before they can rebalance. Then try to have a different table balance
+ */
+public class BalanceInPresenceOfOfflineTableIT extends AccumuloClusterHarness {
+
+  private static Logger log = LoggerFactory.getLogger(BalanceInPresenceOfOfflineTableIT.class);
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    Map<String,String> siteConfig = cfg.getSiteConfig();
+    siteConfig.put(Property.TSERV_MAXMEM.getKey(), "10K");
+    siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "0");
+    cfg.setSiteConfig(siteConfig);
+    // ensure we have two tservers
+    if (cfg.getNumTservers() < 2) {
+      cfg.setNumTservers(2);
+    }
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 10 * 60;
+  }
+
+  private static final int NUM_SPLITS = 200;
+
+  private String UNUSED_TABLE, TEST_TABLE;
+
+  private Connector connector;
+
+  @Before
+  public void setupTables() throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException {
+    Connector conn = getConnector();
+    // Need at least two tservers
+    Assume.assumeTrue("Not enough tservers to run test", conn.instanceOperations().getTabletServers().size() >= 2);
+
+    // set up splits
+    final SortedSet<Text> splits = new TreeSet<Text>();
+    for (int i = 0; i < NUM_SPLITS; i++) {
+      splits.add(new Text(String.format("%08x", i * 1000)));
+    }
+
+    String[] names = getUniqueNames(2);
+    UNUSED_TABLE = names[0];
+    TEST_TABLE = names[1];
+
+    // load into a table we won't use
+    connector = getConnector();
+    connector.tableOperations().create(UNUSED_TABLE);
+    connector.tableOperations().addSplits(UNUSED_TABLE, splits);
+    // mark the table offline before it can rebalance.
+    connector.tableOperations().offline(UNUSED_TABLE);
+
+    // actual test table
+    connector.tableOperations().create(TEST_TABLE);
+    connector.tableOperations().setProperty(TEST_TABLE, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
+  }
+
+  @Test
+  public void test() throws Exception {
+    log.info("Test that balancing is not stopped by an offline table with outstanding migrations.");
+
+    log.debug("starting test ingestion");
+
+    TestIngest.Opts opts = new TestIngest.Opts();
+    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+    ClientConfiguration conf = cluster.getClientConfig();
+    if (conf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+      opts.updateKerberosCredentials(cluster.getClientConfig());
+      vopts.updateKerberosCredentials(cluster.getClientConfig());
+    } else {
+      opts.setPrincipal("root");
+      vopts.setPrincipal("root");
+    }
+    vopts.rows = opts.rows = 200000;
+    opts.setTableName(TEST_TABLE);
+    TestIngest.ingest(connector, opts, new BatchWriterOpts());
+    connector.tableOperations().flush(TEST_TABLE, null, null, true);
+    vopts.setTableName(TEST_TABLE);
+    VerifyIngest.verifyIngest(connector, vopts, new ScannerOpts());
+
+    log.debug("waiting for balancing, up to ~5 minutes to allow for migration cleanup.");
+    final long startTime = System.currentTimeMillis();
+    long currentWait = 10 * 1000;
+    boolean balancingWorked = false;
+
+    Credentials creds = new Credentials(getAdminPrincipal(), getAdminToken());
+    while (!balancingWorked && (System.currentTimeMillis() - startTime) < ((5 * 60 + 15) * 1000)) {
+      Thread.sleep(currentWait);
+      currentWait *= 2;
+
+      log.debug("fetch the list of tablets assigned to each tserver.");
+
+      MasterClientService.Iface client = null;
+      MasterMonitorInfo stats = null;
+      try {
+        Instance instance = new ZooKeeperInstance(cluster.getClientConfig());
+        client = MasterClient.getConnectionWithRetry(new ClientContext(instance, creds, cluster.getClientConfig()));
+        stats = client.getMasterStats(Tracer.traceInfo(), creds.toThrift(instance));
+      } catch (ThriftSecurityException exception) {
+        throw new AccumuloSecurityException(exception);
+      } catch (TException exception) {
+        throw new AccumuloException(exception);
+      } finally {
+        if (client != null) {
+          MasterClient.close(client);
+        }
+      }
+
+      if (stats.getTServerInfoSize() < 2) {
+        log.debug("we need >= 2 servers. sleeping for " + currentWait + "ms");
+        continue;
+      }
+      if (stats.getUnassignedTablets() != 0) {
+        log.debug("We shouldn't have unassigned tablets. sleeping for " + currentWait + "ms");
+        continue;
+      }
+
+      long[] tabletsPerServer = new long[stats.getTServerInfoSize()];
+      Arrays.fill(tabletsPerServer, 0l);
+      for (int i = 0; i < stats.getTServerInfoSize(); i++) {
+        for (Map.Entry<String,TableInfo> entry : stats.getTServerInfo().get(i).getTableMap().entrySet()) {
+          tabletsPerServer[i] += entry.getValue().getTablets();
+        }
+      }
+
+      if (tabletsPerServer[0] <= 10) {
+        log.debug("We should have > 10 tablets. sleeping for " + currentWait + "ms");
+        continue;
+      }
+      long min = NumberUtils.min(tabletsPerServer), max = NumberUtils.max(tabletsPerServer);
+      log.debug("Min=" + min + ", Max=" + max);
+      if ((min / ((double) max)) < 0.5) {
+        log.debug("ratio of min to max tablets per server should be roughly even. sleeping for " + currentWait + "ms");
+        continue;
+      }
+      balancingWorked = true;
+    }
+
+    Assert.assertTrue("did not properly balance", balancingWorked);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
new file mode 100644
index 0000000..14295c4
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BatchScanSplitIT extends AccumuloClusterHarness {
+  private static final Logger log = LoggerFactory.getLogger(BatchScanSplitIT.class);
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(Property.TSERV_MAJC_DELAY, "0");
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+
+    int numRows = 1 << 18;
+
+    BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig());
+
+    for (int i = 0; i < numRows; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i)));
+      m.put(new Text("cf1"), new Text("cq1"), new Value(String.format("%016x", numRows - i).getBytes(UTF_8)));
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    getConnector().tableOperations().flush(tableName, null, null, true);
+
+    getConnector().tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "4K");
+
+    Collection<Text> splits = getConnector().tableOperations().listSplits(tableName);
+    while (splits.size() < 2) {
+      UtilWaitThread.sleep(1);
+      splits = getConnector().tableOperations().listSplits(tableName);
+    }
+
+    System.out.println("splits : " + splits);
+
+    Random random = new Random(19011230);
+    HashMap<Text,Value> expected = new HashMap<Text,Value>();
+    ArrayList<Range> ranges = new ArrayList<Range>();
+    for (int i = 0; i < 100; i++) {
+      int r = random.nextInt(numRows);
+      Text row = new Text(String.format("%09x", r));
+      expected.put(row, new Value(String.format("%016x", numRows - r).getBytes(UTF_8)));
+      ranges.add(new Range(row));
+    }
+
+    // logger.setLevel(Level.TRACE);
+
+    HashMap<Text,Value> found = new HashMap<Text,Value>();
+
+    for (int i = 0; i < 20; i++) {
+      BatchScanner bs = getConnector().createBatchScanner(tableName, Authorizations.EMPTY, 4);
+
+      found.clear();
+
+      long t1 = System.currentTimeMillis();
+
+      bs.setRanges(ranges);
+
+      for (Entry<Key,Value> entry : bs) {
+        found.put(entry.getKey().getRow(), entry.getValue());
+      }
+      bs.close();
+
+      long t2 = System.currentTimeMillis();
+
+      log.info(String.format("rate : %06.2f%n", ranges.size() / ((t2 - t1) / 1000.0)));
+
+      if (!found.equals(expected))
+        throw new Exception("Found and expected differ " + found + " " + expected);
+    }
+
+    splits = getConnector().tableOperations().listSplits(tableName);
+    log.info("splits : " + splits);
+  }
+
+}