You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/06/04 20:53:16 UTC

[35/43] accumulo git commit: ACCUMULO-3871 move ITs into distro jar, stop building test jar

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
new file mode 100644
index 0000000..7c05a0f
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class BatchWriterFlushIT extends AccumuloClusterHarness {
+
+  private static final int NUM_TO_FLUSH = 100000;
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 90;
+  }
+
+  @Test
+  public void run() throws Exception {
+    Connector c = getConnector();
+    String[] tableNames = getUniqueNames(2);
+    String bwft = tableNames[0];
+    c.tableOperations().create(bwft);
+    String bwlt = tableNames[1];
+    c.tableOperations().create(bwlt);
+    runFlushTest(bwft);
+    runLatencyTest(bwlt);
+
+  }
+
+  private void runLatencyTest(String tableName) throws Exception {
+    // should automatically flush after 2 seconds
+    BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig().setMaxLatency(1000, TimeUnit.MILLISECONDS));
+    Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY);
+
+    Mutation m = new Mutation(new Text(String.format("r_%10d", 1)));
+    m.put(new Text("cf"), new Text("cq"), new Value("1".getBytes(UTF_8)));
+    bw.addMutation(m);
+
+    UtilWaitThread.sleep(500);
+
+    int count = Iterators.size(scanner.iterator());
+
+    if (count != 0) {
+      throw new Exception("Flushed too soon");
+    }
+
+    UtilWaitThread.sleep(1500);
+
+    count = Iterators.size(scanner.iterator());
+
+    if (count != 1) {
+      throw new Exception("Did not flush");
+    }
+
+    bw.close();
+  }
+
+  private void runFlushTest(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException,
+      Exception {
+    BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig());
+    Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY);
+    Random r = new Random();
+
+    for (int i = 0; i < 4; i++) {
+      for (int j = 0; j < NUM_TO_FLUSH; j++) {
+        int row = i * NUM_TO_FLUSH + j;
+
+        Mutation m = new Mutation(new Text(String.format("r_%10d", row)));
+        m.put(new Text("cf"), new Text("cq"), new Value(("" + row).getBytes()));
+        bw.addMutation(m);
+      }
+
+      bw.flush();
+
+      // do a few random lookups into the data just flushed
+
+      for (int k = 0; k < 10; k++) {
+        int rowToLookup = r.nextInt(NUM_TO_FLUSH) + i * NUM_TO_FLUSH;
+
+        scanner.setRange(new Range(new Text(String.format("r_%10d", rowToLookup))));
+
+        Iterator<Entry<Key,Value>> iter = scanner.iterator();
+
+        if (!iter.hasNext())
+          throw new Exception(" row " + rowToLookup + " not found after flush");
+
+        Entry<Key,Value> entry = iter.next();
+
+        if (iter.hasNext())
+          throw new Exception("Scanner returned too much");
+
+        verifyEntry(rowToLookup, entry);
+      }
+
+      // scan all data just flushed
+      scanner.setRange(new Range(new Text(String.format("r_%10d", i * NUM_TO_FLUSH)), true, new Text(String.format("r_%10d", (i + 1) * NUM_TO_FLUSH)), false));
+      Iterator<Entry<Key,Value>> iter = scanner.iterator();
+
+      for (int j = 0; j < NUM_TO_FLUSH; j++) {
+        int row = i * NUM_TO_FLUSH + j;
+
+        if (!iter.hasNext())
+          throw new Exception("Scan stopped permaturely at " + row);
+
+        Entry<Key,Value> entry = iter.next();
+
+        verifyEntry(row, entry);
+      }
+
+      if (iter.hasNext())
+        throw new Exception("Scanner returned too much");
+
+    }
+
+    bw.close();
+
+    // test adding a mutation to a closed batch writer
+    boolean caught = false;
+    try {
+      bw.addMutation(new Mutation(new Text("foobar")));
+    } catch (IllegalStateException ise) {
+      caught = true;
+    }
+
+    if (!caught) {
+      throw new Exception("Adding to closed batch writer did not fail");
+    }
+  }
+
+  private void verifyEntry(int row, Entry<Key,Value> entry) throws Exception {
+    if (!entry.getKey().getRow().toString().equals(String.format("r_%10d", row))) {
+      throw new Exception("Unexpected key returned, expected " + row + " got " + entry.getKey());
+    }
+
+    if (!entry.getValue().toString().equals("" + row)) {
+      throw new Exception("Unexpected value, expected " + row + " got " + entry.getValue());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BigRootTabletIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BigRootTabletIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BigRootTabletIT.java
new file mode 100644
index 0000000..11dcb66
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BigRootTabletIT.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class BigRootTabletIT extends AccumuloClusterHarness {
+  // ACCUMULO-542: A large root tablet will fail to load if it does't fit in the tserver scan buffers
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    Map<String,String> siteConfig = cfg.getSiteConfig();
+    siteConfig.put(Property.TABLE_SCAN_MAXMEM.getKey(), "1024");
+    siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "60m");
+    cfg.setSiteConfig(siteConfig);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    Connector c = getConnector();
+    c.tableOperations().addSplits(MetadataTable.NAME, FunctionalTestUtils.splits("0 1 2 3 4 5 6 7 8 9 a".split(" ")));
+    String[] names = getUniqueNames(10);
+    for (String name : names) {
+      c.tableOperations().create(name);
+      c.tableOperations().flush(MetadataTable.NAME, null, null, true);
+      c.tableOperations().flush(RootTable.NAME, null, null, true);
+    }
+    cluster.stop();
+    cluster.start();
+    assertTrue(Iterators.size(c.createScanner(RootTable.NAME, Authorizations.EMPTY).iterator()) > 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BinaryIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BinaryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BinaryIT.java
new file mode 100644
index 0000000..85716d5
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BinaryIT.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.TestBinaryRows;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class BinaryIT extends AccumuloClusterHarness {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 90;
+  }
+
+  @Test
+  public void test() throws Exception {
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    runTest(c, tableName);
+  }
+
+  @Test
+  public void testPreSplit() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    Connector c = getConnector();
+    c.tableOperations().create(tableName);
+    SortedSet<Text> splits = new TreeSet<Text>();
+    splits.add(new Text("8"));
+    splits.add(new Text("256"));
+    c.tableOperations().addSplits(tableName, splits);
+    runTest(c, tableName);
+  }
+
+  public static void runTest(Connector c, String tableName) throws Exception {
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    ScannerOpts scanOpts = new ScannerOpts();
+    TestBinaryRows.Opts opts = new TestBinaryRows.Opts();
+    opts.setTableName(tableName);
+    opts.start = 0;
+    opts.num = 100000;
+    opts.mode = "ingest";
+    TestBinaryRows.runTest(c, opts, bwOpts, scanOpts);
+    opts.mode = "verify";
+    TestBinaryRows.runTest(c, opts, bwOpts, scanOpts);
+    opts.start = 25000;
+    opts.num = 50000;
+    opts.mode = "delete";
+    TestBinaryRows.runTest(c, opts, bwOpts, scanOpts);
+    opts.start = 0;
+    opts.num = 25000;
+    opts.mode = "verify";
+    TestBinaryRows.runTest(c, opts, bwOpts, scanOpts);
+    opts.start = 75000;
+    opts.num = 25000;
+    opts.mode = "randomLookups";
+    TestBinaryRows.runTest(c, opts, bwOpts, scanOpts);
+    opts.start = 25000;
+    opts.num = 50000;
+    opts.mode = "verifyDeleted";
+    TestBinaryRows.runTest(c, opts, bwOpts, scanOpts);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BinaryStressIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BinaryStressIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BinaryStressIT.java
new file mode 100644
index 0000000..440d2cf
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BinaryStressIT.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BinaryStressIT extends AccumuloClusterHarness {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
+    cfg.setProperty(Property.TSERV_MAXMEM, "50K");
+    cfg.setProperty(Property.TSERV_MAJC_DELAY, "0");
+  }
+
+  private String majcDelay, maxMem;
+
+  @Before
+  public void alterConfig() throws Exception {
+    if (ClusterType.MINI == getClusterType()) {
+      return;
+    }
+
+    InstanceOperations iops = getConnector().instanceOperations();
+    Map<String,String> conf = iops.getSystemConfiguration();
+    majcDelay = conf.get(Property.TSERV_MAJC_DELAY.getKey());
+    maxMem = conf.get(Property.TSERV_MAXMEM.getKey());
+
+    iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), "0");
+    iops.setProperty(Property.TSERV_MAXMEM.getKey(), "50K");
+
+    getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+    getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+  }
+
+  @After
+  public void resetConfig() throws Exception {
+    if (null != majcDelay) {
+      InstanceOperations iops = getConnector().instanceOperations();
+      iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay);
+      iops.setProperty(Property.TSERV_MAXMEM.getKey(), maxMem);
+
+      getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+      getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+    }
+  }
+
+  @Test
+  public void binaryStressTest() throws Exception {
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
+    BinaryIT.runTest(c, tableName);
+    String id = c.tableOperations().tableIdMap().get(tableName);
+    Set<Text> tablets = new HashSet<>();
+    Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(Range.prefix(id));
+    for (Entry<Key,Value> entry : s) {
+      tablets.add(entry.getKey().getRow());
+    }
+    assertTrue("Expected at least 8 tablets, saw " + tablets.size(), tablets.size() > 7);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BloomFilterIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BloomFilterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BloomFilterIT.java
new file mode 100644
index 0000000..fbbe542
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BloomFilterIT.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.keyfunctor.ColumnFamilyFunctor;
+import org.apache.accumulo.core.file.keyfunctor.ColumnQualifierFunctor;
+import org.apache.accumulo.core.file.keyfunctor.RowFunctor;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BloomFilterIT extends AccumuloClusterHarness {
+  private static final Logger log = LoggerFactory.getLogger(BloomFilterIT.class);
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setDefaultMemory(1, MemoryUnit.GIGABYTE);
+    cfg.setNumTservers(1);
+    Map<String,String> siteConfig = cfg.getSiteConfig();
+    siteConfig.put(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "10M");
+    cfg.setSiteConfig(siteConfig);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 6 * 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    Connector c = getConnector();
+    final String readAhead = c.instanceOperations().getSystemConfiguration().get(Property.TSERV_READ_AHEAD_MAXCONCURRENT.getKey());
+    c.instanceOperations().setProperty(Property.TSERV_READ_AHEAD_MAXCONCURRENT.getKey(), "1");
+    try {
+      Thread.sleep(1000);
+      final String[] tables = getUniqueNames(4);
+      for (String table : tables) {
+        TableOperations tops = c.tableOperations();
+        tops.create(table);
+        tops.setProperty(table, Property.TABLE_INDEXCACHE_ENABLED.getKey(), "false");
+        tops.setProperty(table, Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "false");
+        tops.setProperty(table, Property.TABLE_BLOOM_SIZE.getKey(), "2000000");
+        tops.setProperty(table, Property.TABLE_BLOOM_ERRORRATE.getKey(), "1%");
+        tops.setProperty(table, Property.TABLE_BLOOM_LOAD_THRESHOLD.getKey(), "0");
+        tops.setProperty(table, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64K");
+      }
+      log.info("Writing");
+      write(c, tables[0], 1, 0, 2000000000, 500);
+      write(c, tables[1], 2, 0, 2000000000, 500);
+      write(c, tables[2], 3, 0, 2000000000, 500);
+      log.info("Writing complete");
+
+      // test inserting an empty key
+      BatchWriter bw = c.createBatchWriter(tables[3], new BatchWriterConfig());
+      Mutation m = new Mutation(new Text(""));
+      m.put(new Text(""), new Text(""), new Value("foo1".getBytes()));
+      bw.addMutation(m);
+      bw.close();
+      c.tableOperations().flush(tables[3], null, null, true);
+
+      for (String table : Arrays.asList(tables[0], tables[1], tables[2])) {
+        c.tableOperations().compact(table, null, null, true, true);
+      }
+
+      // ensure compactions are finished
+      for (String table : tables) {
+        FunctionalTestUtils.checkRFiles(c, table, 1, 1, 1, 1);
+      }
+
+      // these queries should only run quickly if bloom filters are working, so lets get a base
+      log.info("Base query");
+      long t1 = query(c, tables[0], 1, 0, 2000000000, 5000, 500);
+      long t2 = query(c, tables[1], 2, 0, 2000000000, 5000, 500);
+      long t3 = query(c, tables[2], 3, 0, 2000000000, 5000, 500);
+      log.info("Base query complete");
+
+      log.info("Rewriting with bloom filters");
+      c.tableOperations().setProperty(tables[0], Property.TABLE_BLOOM_ENABLED.getKey(), "true");
+      c.tableOperations().setProperty(tables[0], Property.TABLE_BLOOM_KEY_FUNCTOR.getKey(), RowFunctor.class.getName());
+
+      c.tableOperations().setProperty(tables[1], Property.TABLE_BLOOM_ENABLED.getKey(), "true");
+      c.tableOperations().setProperty(tables[1], Property.TABLE_BLOOM_KEY_FUNCTOR.getKey(), ColumnFamilyFunctor.class.getName());
+
+      c.tableOperations().setProperty(tables[2], Property.TABLE_BLOOM_ENABLED.getKey(), "true");
+      c.tableOperations().setProperty(tables[2], Property.TABLE_BLOOM_KEY_FUNCTOR.getKey(), ColumnQualifierFunctor.class.getName());
+
+      c.tableOperations().setProperty(tables[3], Property.TABLE_BLOOM_ENABLED.getKey(), "true");
+      c.tableOperations().setProperty(tables[3], Property.TABLE_BLOOM_KEY_FUNCTOR.getKey(), RowFunctor.class.getName());
+
+      // ensure the updates to zookeeper propagate
+      UtilWaitThread.sleep(500);
+
+      c.tableOperations().compact(tables[3], null, null, false, true);
+      c.tableOperations().compact(tables[0], null, null, false, true);
+      c.tableOperations().compact(tables[1], null, null, false, true);
+      c.tableOperations().compact(tables[2], null, null, false, true);
+      log.info("Rewriting with bloom filters complete");
+
+      // these queries should only run quickly if bloom
+      // filters are working
+      log.info("Bloom query");
+      long tb1 = query(c, tables[0], 1, 0, 2000000000, 5000, 500);
+      long tb2 = query(c, tables[1], 2, 0, 2000000000, 5000, 500);
+      long tb3 = query(c, tables[2], 3, 0, 2000000000, 5000, 500);
+      log.info("Bloom query complete");
+      timeCheck(t1 + t2 + t3, tb1 + tb2 + tb3);
+
+      // test querying for empty key
+      Scanner scanner = c.createScanner(tables[3], Authorizations.EMPTY);
+      scanner.setRange(new Range(new Text("")));
+
+      if (!scanner.iterator().next().getValue().toString().equals("foo1")) {
+        throw new Exception("Did not see foo1");
+      }
+    } finally {
+      c.instanceOperations().setProperty(Property.TSERV_READ_AHEAD_MAXCONCURRENT.getKey(), readAhead);
+    }
+  }
+
+  private void timeCheck(long t1, long t2) throws Exception {
+    double improvement = (t1 - t2) * 1.0 / t1;
+    if (improvement < .1) {
+      throw new Exception("Queries had less than 10% improvement (old: " + t1 + " new: " + t2 + " improvement: " + (improvement * 100) + "%)");
+    }
+    log.info(String.format("Improvement: %.2f%% (%d vs %d)", (improvement * 100), t1, t2));
+  }
+
+  private long query(Connector c, String table, int depth, long start, long end, int num, int step) throws Exception {
+    Random r = new Random(42);
+
+    HashSet<Long> expected = new HashSet<Long>();
+    List<Range> ranges = new ArrayList<Range>(num);
+    Text key = new Text();
+    Text row = new Text("row"), cq = new Text("cq"), cf = new Text("cf");
+
+    for (int i = 0; i < num; ++i) {
+      Long k = ((r.nextLong() & 0x7fffffffffffffffl) % (end - start)) + start;
+      key.set(String.format("k_%010d", k));
+      Range range = null;
+      Key acuKey;
+
+      if (k % (start + step) == 0) {
+        expected.add(k);
+      }
+
+      switch (depth) {
+        case 1:
+          range = new Range(new Text(key));
+          break;
+        case 2:
+          acuKey = new Key(row, key, cq);
+          range = new Range(acuKey, true, acuKey.followingKey(PartialKey.ROW_COLFAM), false);
+          break;
+        case 3:
+          acuKey = new Key(row, cf, key);
+          range = new Range(acuKey, true, acuKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL), false);
+          break;
+      }
+
+      ranges.add(range);
+    }
+
+    BatchScanner bs = c.createBatchScanner(table, Authorizations.EMPTY, 1);
+    bs.setRanges(ranges);
+
+    long t1 = System.currentTimeMillis();
+    for (Entry<Key,Value> entry : bs) {
+      long v = Long.parseLong(entry.getValue().toString());
+      if (!expected.remove(v)) {
+        throw new Exception("Got unexpected return " + entry.getKey() + " " + entry.getValue());
+      }
+    }
+    long t2 = System.currentTimeMillis();
+
+    if (expected.size() > 0) {
+      throw new Exception("Did not get all expected values " + expected.size());
+    }
+
+    bs.close();
+
+    return t2 - t1;
+  }
+
+  private void write(Connector c, String table, int depth, long start, long end, int step) throws Exception {
+
+    BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+
+    for (long i = start; i < end; i += step) {
+      String key = String.format("k_%010d", i);
+
+      Mutation m = null;
+
+      switch (depth) {
+        case 1:
+          m = new Mutation(new Text(key));
+          m.put(new Text("cf"), new Text("cq"), new Value(("" + i).getBytes()));
+          break;
+        case 2:
+          m = new Mutation(new Text("row"));
+          m.put(new Text(key), new Text("cq"), new Value(("" + i).getBytes()));
+          break;
+        case 3:
+          m = new Mutation(new Text("row"));
+          m.put(new Text("cf"), new Text(key), new Value(("" + i).getBytes()));
+          break;
+      }
+
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    c.tableOperations().flush(table, null, null, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
new file mode 100644
index 0000000..1abafeb
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class BulkFileIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf) {
+    cfg.setMemory(ServerType.TABLET_SERVER, 128 * 4, MemoryUnit.MEGABYTE);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  @Test
+  public void testBulkFile() throws Exception {
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    SortedSet<Text> splits = new TreeSet<Text>();
+    for (String split : "0333 0666 0999 1333 1666".split(" "))
+      splits.add(new Text(split));
+    c.tableOperations().addSplits(tableName, splits);
+    Configuration conf = new Configuration();
+    AccumuloConfiguration aconf = new ServerConfigurationFactory(c.getInstance()).getConfiguration();
+    FileSystem fs = getCluster().getFileSystem();
+
+    String rootPath = cluster.getTemporaryPath().toString();
+
+    String dir = rootPath + "/bulk_test_diff_files_89723987592_" + getUniqueNames(1)[0];
+
+    fs.delete(new Path(dir), true);
+
+    FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + RFile.EXTENSION, fs, conf, aconf);
+    writer1.startDefaultLocalityGroup();
+    writeData(writer1, 0, 333);
+    writer1.close();
+
+    FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + RFile.EXTENSION, fs, conf, aconf);
+    writer2.startDefaultLocalityGroup();
+    writeData(writer2, 334, 999);
+    writer2.close();
+
+    FileSKVWriter writer3 = FileOperations.getInstance().openWriter(dir + "/f3." + RFile.EXTENSION, fs, conf, aconf);
+    writer3.startDefaultLocalityGroup();
+    writeData(writer3, 1000, 1999);
+    writer3.close();
+
+    FunctionalTestUtils.bulkImport(c, fs, tableName, dir);
+
+    FunctionalTestUtils.checkRFiles(c, tableName, 6, 6, 1, 1);
+
+    verifyData(tableName, 0, 1999);
+
+  }
+
+  private void verifyData(String table, int s, int e) throws Exception {
+    Scanner scanner = getConnector().createScanner(table, Authorizations.EMPTY);
+
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+
+    for (int i = s; i <= e; i++) {
+      if (!iter.hasNext())
+        throw new Exception("row " + i + " not found");
+
+      Entry<Key,Value> entry = iter.next();
+
+      String row = String.format("%04d", i);
+
+      if (!entry.getKey().getRow().equals(new Text(row)))
+        throw new Exception("unexpected row " + entry.getKey() + " " + i);
+
+      if (Integer.parseInt(entry.getValue().toString()) != i)
+        throw new Exception("unexpected value " + entry + " " + i);
+    }
+
+    if (iter.hasNext())
+      throw new Exception("found more than expected " + iter.next());
+  }
+
+  private void writeData(FileSKVWriter w, int s, int e) throws Exception {
+    for (int i = s; i <= e; i++) {
+      w.append(new Key(new Text(String.format("%04d", i))), new Value(Integer.toString(i).getBytes(UTF_8)));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
new file mode 100644
index 0000000..f60724e
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.TestIngest.Opts;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BulkIT extends AccumuloClusterHarness {
+
+  private static final int N = 100000;
+  private static final int COUNT = 5;
+  private static final BatchWriterOpts BWOPTS = new BatchWriterOpts();
+  private static final ScannerOpts SOPTS = new ScannerOpts();
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  private Configuration origConf;
+
+  @Before
+  public void saveConf() {
+    origConf = CachedConfiguration.getInstance();
+  }
+
+  @After
+  public void restoreConf() {
+    if (null != origConf) {
+      CachedConfiguration.setInstance(origConf);
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    runTest(getConnector(), getCluster().getFileSystem(), getCluster().getTemporaryPath(), getAdminPrincipal(), getUniqueNames(1)[0],
+        this.getClass().getName(), testName.getMethodName());
+  }
+
+  static void runTest(Connector c, FileSystem fs, Path basePath, String principal, String tableName, String filePrefix, String dirSuffix) throws Exception {
+    c.tableOperations().create(tableName);
+    CachedConfiguration.setInstance(fs.getConf());
+
+    Path base = new Path(basePath, "testBulkFail_" + dirSuffix);
+    fs.delete(base, true);
+    fs.mkdirs(base);
+    Path bulkFailures = new Path(base, "failures");
+    Path files = new Path(base, "files");
+    fs.mkdirs(bulkFailures);
+    fs.mkdirs(files);
+
+    Opts opts = new Opts();
+    opts.timestamp = 1;
+    opts.random = 56;
+    opts.rows = N;
+    opts.instance = c.getInstance().getInstanceName();
+    opts.cols = 1;
+    opts.setTableName(tableName);
+    opts.conf = CachedConfiguration.getInstance();
+    opts.fs = fs;
+    String fileFormat = filePrefix + "rf%02d";
+    for (int i = 0; i < COUNT; i++) {
+      opts.outputFile = new Path(files, String.format(fileFormat, i)).toString();
+      opts.startRow = N * i;
+      TestIngest.ingest(c, opts, BWOPTS);
+    }
+    opts.outputFile = base + String.format(fileFormat, N);
+    opts.startRow = N;
+    opts.rows = 1;
+    // create an rfile with one entry, there was a bug with this:
+    TestIngest.ingest(c, opts, BWOPTS);
+
+    // Make sure the server can modify the files
+    FsShell fsShell = new FsShell(fs.getConf());
+    Assert.assertEquals("Failed to chmod " + base.toString(), 0, fsShell.run(new String[] {"-chmod", "-R", "777", base.toString()}));
+
+    c.tableOperations().importDirectory(tableName, files.toString(), bulkFailures.toString(), false);
+    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+    vopts.setTableName(tableName);
+    vopts.random = 56;
+    vopts.setPrincipal(principal);
+    for (int i = 0; i < COUNT; i++) {
+      vopts.startRow = i * N;
+      vopts.rows = N;
+      VerifyIngest.verifyIngest(c, vopts, SOPTS);
+    }
+    vopts.startRow = N;
+    vopts.rows = 1;
+    VerifyIngest.verifyIngest(c, vopts, SOPTS);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
new file mode 100644
index 0000000..74d3e96
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import org.apache.accumulo.core.cli.ClientOpts.Password;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test verifies that when a lot of files are bulk imported into a table with one tablet and then splits that not all map files go to the children tablets.
+ */
+
+public class BulkSplitOptimizationIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(Property.TSERV_MAJC_DELAY, "1s");
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  private String majcDelay;
+
+  @Before
+  public void alterConfig() throws Exception {
+    Connector conn = getConnector();
+    majcDelay = conn.instanceOperations().getSystemConfiguration().get(Property.TSERV_MAJC_DELAY.getKey());
+    if (!"1s".equals(majcDelay)) {
+      conn.instanceOperations().setProperty(Property.TSERV_MAJC_DELAY.getKey(), "1s");
+      getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+      getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+    }
+  }
+
+  @After
+  public void resetConfig() throws Exception {
+    if (null != majcDelay) {
+      Connector conn = getConnector();
+      conn.instanceOperations().setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay);
+      getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+      getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+    }
+  }
+
+  static final int ROWS = 100000;
+  static final int SPLITS = 99;
+
+  @Test
+  public void testBulkSplitOptimization() throws Exception {
+    final Connector c = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1000");
+    c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "1000");
+    c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G");
+
+    FileSystem fs = getFileSystem();
+    Path testDir = new Path(getUsableDir(), "testmf");
+    FunctionalTestUtils.createRFiles(c, fs, testDir.toString(), ROWS, SPLITS, 8);
+    FileStatus[] stats = fs.listStatus(testDir);
+
+    System.out.println("Number of generated files: " + stats.length);
+    FunctionalTestUtils.bulkImport(c, fs, tableName, testDir.toString());
+    FunctionalTestUtils.checkSplits(c, tableName, 0, 0);
+    FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100);
+
+    // initiate splits
+    getConnector().tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "100K");
+
+    UtilWaitThread.sleep(2000);
+
+    // wait until over split threshold -- should be 78 splits
+    while (getConnector().tableOperations().listSplits(tableName).size() < 75) {
+      UtilWaitThread.sleep(500);
+    }
+
+    FunctionalTestUtils.checkSplits(c, tableName, 50, 100);
+    VerifyIngest.Opts opts = new VerifyIngest.Opts();
+    opts.timestamp = 1;
+    opts.dataSize = 50;
+    opts.random = 56;
+    opts.rows = 100000;
+    opts.startRow = 0;
+    opts.cols = 1;
+    opts.setTableName(tableName);
+
+    AuthenticationToken adminToken = getAdminToken();
+    if (adminToken instanceof PasswordToken) {
+      PasswordToken token = (PasswordToken) getAdminToken();
+      opts.setPassword(new Password(new String(token.getPassword(), UTF_8)));
+      opts.setPrincipal(getAdminPrincipal());
+    } else if (adminToken instanceof KerberosToken) {
+      ClientConfiguration clientConf = cluster.getClientConfig();
+      opts.updateKerberosCredentials(clientConf);
+    } else {
+      Assert.fail("Unknown token type");
+    }
+
+    VerifyIngest.verifyIngest(c, opts, new ScannerOpts());
+
+    // ensure each tablet does not have all map files, should be ~2.5 files per tablet
+    FunctionalTestUtils.checkRFiles(c, tableName, 50, 100, 1, 4);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java
new file mode 100644
index 0000000..4055c3a
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.master.balancer.ChaoticLoadBalancer;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class ChaoticBalancerIT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    Map<String,String> siteConfig = cfg.getSiteConfig();
+    siteConfig.put(Property.TSERV_MAXMEM.getKey(), "10K");
+    siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "0");
+    cfg.setSiteConfig(siteConfig);
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    Connector c = getConnector();
+    String[] names = getUniqueNames(2);
+    String tableName = names[0], unused = names[1];
+    c.tableOperations().create(tableName);
+    c.tableOperations().setProperty(tableName, Property.TABLE_LOAD_BALANCER.getKey(), ChaoticLoadBalancer.class.getName());
+    c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
+    SortedSet<Text> splits = new TreeSet<Text>();
+    for (int i = 0; i < 100; i++) {
+      splits.add(new Text(String.format("%03d", i)));
+    }
+    c.tableOperations().create(unused);
+    c.tableOperations().addSplits(unused, splits);
+    TestIngest.Opts opts = new TestIngest.Opts();
+    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+    vopts.rows = opts.rows = 20000;
+    opts.setTableName(tableName);
+    vopts.setTableName(tableName);
+    ClientConfiguration clientConfig = getCluster().getClientConfig();
+    if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+      opts.updateKerberosCredentials(clientConfig);
+      vopts.updateKerberosCredentials(clientConfig);
+    } else {
+      opts.setPrincipal(getAdminPrincipal());
+      vopts.setPrincipal(getAdminPrincipal());
+    }
+    TestIngest.ingest(c, opts, new BatchWriterOpts());
+    c.tableOperations().flush(tableName, null, null, true);
+    VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java
new file mode 100644
index 0000000..c06feed
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Combiner;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ClassLoaderIT extends AccumuloClusterHarness {
+
+  private static final long ZOOKEEPER_PROPAGATION_TIME = 10 * 1000;
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  private String rootPath;
+
+  @Before
+  public void checkCluster() {
+    Assume.assumeThat(getClusterType(), CoreMatchers.is(ClusterType.MINI));
+    MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) getCluster();
+    rootPath = mac.getConfig().getDir().getAbsolutePath();
+  }
+
+  @Test
+  public void test() throws Exception {
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+    Mutation m = new Mutation("row1");
+    m.put("cf", "col1", "Test");
+    bw.addMutation(m);
+    bw.close();
+    scanCheck(c, tableName, "Test");
+    FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+    Path jarPath = new Path(rootPath + "/lib/ext/Test.jar");
+    fs.copyFromLocalFile(new Path(System.getProperty("user.dir") + "/src/test/resources/TestCombinerX.jar"), jarPath);
+    UtilWaitThread.sleep(1000);
+    IteratorSetting is = new IteratorSetting(10, "TestCombiner", "org.apache.accumulo.test.functional.TestCombiner");
+    Combiner.setColumns(is, Collections.singletonList(new IteratorSetting.Column("cf")));
+    c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.scan));
+    UtilWaitThread.sleep(ZOOKEEPER_PROPAGATION_TIME);
+    scanCheck(c, tableName, "TestX");
+    fs.delete(jarPath, true);
+    fs.copyFromLocalFile(new Path(System.getProperty("user.dir") + "/src/test/resources/TestCombinerY.jar"), jarPath);
+    UtilWaitThread.sleep(5000);
+    scanCheck(c, tableName, "TestY");
+    fs.delete(jarPath, true);
+  }
+
+  private void scanCheck(Connector c, String tableName, String expected) throws Exception {
+    Scanner bs = c.createScanner(tableName, Authorizations.EMPTY);
+    Iterator<Entry<Key,Value>> iterator = bs.iterator();
+    assertTrue(iterator.hasNext());
+    Entry<Key,Value> next = iterator.next();
+    assertFalse(iterator.hasNext());
+    assertEquals(expected, next.getValue().toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/CleanTmpIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CleanTmpIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CleanTmpIT.java
new file mode 100644
index 0000000..779b407
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CleanTmpIT.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+
+public class CleanTmpIT extends ConfigurableMacBase {
+  private static final Logger log = LoggerFactory.getLogger(CleanTmpIT.class);
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
+    cfg.setNumTservers(1);
+    // use raw local file system so walogs sync and flush will work
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  @Test
+  public void test() throws Exception {
+    Connector c = getConnector();
+    // make a table
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    // write to it
+    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+    Mutation m = new Mutation("row");
+    m.put("cf", "cq", "value");
+    bw.addMutation(m);
+    bw.flush();
+
+    // Compact memory to make a file
+    c.tableOperations().compact(tableName, null, null, true, true);
+
+    // Make sure that we'll have a WAL
+    m = new Mutation("row2");
+    m.put("cf", "cq", "value");
+    bw.addMutation(m);
+    bw.close();
+
+    // create a fake _tmp file in its directory
+    String id = c.tableOperations().tableIdMap().get(tableName);
+    Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(Range.prefix(id));
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+    Path file = new Path(entry.getKey().getColumnQualifier().toString());
+
+    FileSystem fs = getCluster().getFileSystem();
+    assertTrue("Could not find file: " + file, fs.exists(file));
+    Path tabletDir = file.getParent();
+    assertNotNull("Tablet dir should not be null", tabletDir);
+    Path tmp = new Path(tabletDir, "junk.rf_tmp");
+    // Make the file
+    fs.create(tmp).close();
+    log.info("Created tmp file {}", tmp.toString());
+    getCluster().stop();
+    getCluster().start();
+
+    Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
+    assertEquals(2, Iterators.size(scanner.iterator()));
+    // If we performed log recovery, we should have cleaned up any stray files
+    assertFalse("File still exists: " + tmp, fs.exists(tmp));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/CleanUpIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CleanUpIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CleanUpIT.java
new file mode 100644
index 0000000..1f6d1a0
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CleanUpIT.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CleanUp;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Ensures that all threads spawned for ZooKeeper and Thrift connectivity are reaped after calling CleanUp.shutdown().
+ *
+ * Because this is destructive across the current context classloader, the normal teardown methods will fail (because they attempt to create a Connector). Until
+ * the ZooKeeperInstance and Connector are self-contained WRT resource management, we can't leverage the AccumuloClusterBase.
+ */
+public class CleanUpIT extends SharedMiniClusterBase {
+  private static final Logger log = LoggerFactory.getLogger(CleanUpIT.class);
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 30;
+  }
+
+  @Test
+  public void run() throws Exception {
+
+    String tableName = getUniqueNames(1)[0];
+    getConnector().tableOperations().create(tableName);
+
+    BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig());
+
+    Mutation m1 = new Mutation("r1");
+    m1.put("cf1", "cq1", 1, "5");
+
+    bw.addMutation(m1);
+
+    bw.flush();
+
+    Scanner scanner = getConnector().createScanner(tableName, new Authorizations());
+
+    int count = 0;
+    for (Entry<Key,Value> entry : scanner) {
+      count++;
+      if (!entry.getValue().toString().equals("5")) {
+        Assert.fail("Unexpected value " + entry.getValue());
+      }
+    }
+
+    Assert.assertEquals("Unexpected count", 1, count);
+
+    int threadCount = countThreads();
+    if (threadCount < 2) {
+      printThreadNames();
+      Assert.fail("Not seeing expected threads. Saw " + threadCount);
+    }
+
+    CleanUp.shutdownNow();
+
+    Mutation m2 = new Mutation("r2");
+    m2.put("cf1", "cq1", 1, "6");
+
+    try {
+      bw.addMutation(m1);
+      bw.flush();
+      Assert.fail("batch writer did not fail");
+    } catch (Exception e) {
+
+    }
+
+    try {
+      // expect this to fail also, want to clean up batch writer threads
+      bw.close();
+      Assert.fail("batch writer close not fail");
+    } catch (Exception e) {
+
+    }
+
+    try {
+      count = 0;
+      Iterator<Entry<Key,Value>> iter = scanner.iterator();
+      while (iter.hasNext()) {
+        iter.next();
+        count++;
+      }
+      Assert.fail("scanner did not fail");
+    } catch (Exception e) {
+
+    }
+
+    threadCount = countThreads();
+    if (threadCount > 0) {
+      printThreadNames();
+      Assert.fail("Threads did not go away. Saw " + threadCount);
+    }
+  }
+
+  private void printThreadNames() {
+    Set<Thread> threads = Thread.getAllStackTraces().keySet();
+    Exception e = new Exception();
+    for (Thread thread : threads) {
+      e.setStackTrace(thread.getStackTrace());
+      log.info("thread name: " + thread.getName(), e);
+    }
+  }
+
+  /**
+   * count threads that should be cleaned up
+   *
+   */
+  private int countThreads() {
+    int count = 0;
+    Set<Thread> threads = Thread.getAllStackTraces().keySet();
+    for (Thread thread : threads) {
+
+      if (thread.getName().toLowerCase().contains("sendthread") || thread.getName().toLowerCase().contains("eventthread"))
+        count++;
+
+      if (thread.getName().toLowerCase().contains("thrift") && thread.getName().toLowerCase().contains("pool"))
+        count++;
+    }
+
+    return count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/CloneTestIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CloneTestIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CloneTestIT.java
new file mode 100644
index 0000000..b3d0ab5
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CloneTestIT.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.cluster.AccumuloCluster;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.DiskUsage;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class CloneTestIT extends AccumuloClusterHarness {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  @Test
+  public void testProps() throws Exception {
+    String[] tableNames = getUniqueNames(2);
+    String table1 = tableNames[0];
+    String table2 = tableNames[1];
+
+    Connector c = getConnector();
+
+    c.tableOperations().create(table1);
+
+    c.tableOperations().setProperty(table1, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1M");
+    c.tableOperations().setProperty(table1, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), "2M");
+    c.tableOperations().setProperty(table1, Property.TABLE_FILE_MAX.getKey(), "23");
+
+    BatchWriter bw = writeData(table1, c);
+
+    Map<String,String> props = new HashMap<String,String>();
+    props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "500K");
+
+    Set<String> exclude = new HashSet<String>();
+    exclude.add(Property.TABLE_FILE_MAX.getKey());
+
+    c.tableOperations().clone(table1, table2, true, props, exclude);
+
+    Mutation m3 = new Mutation("009");
+    m3.put("data", "x", "1");
+    m3.put("data", "y", "2");
+    bw.addMutation(m3);
+    bw.close();
+
+    checkData(table2, c);
+
+    checkMetadata(table2, c);
+
+    HashMap<String,String> tableProps = new HashMap<String,String>();
+    for (Entry<String,String> prop : c.tableOperations().getProperties(table2)) {
+      tableProps.put(prop.getKey(), prop.getValue());
+    }
+
+    Assert.assertEquals("500K", tableProps.get(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey()));
+    Assert.assertEquals(Property.TABLE_FILE_MAX.getDefaultValue(), tableProps.get(Property.TABLE_FILE_MAX.getKey()));
+    Assert.assertEquals("2M", tableProps.get(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey()));
+
+    c.tableOperations().delete(table1);
+    c.tableOperations().delete(table2);
+
+  }
+
+  private void checkData(String table2, Connector c) throws TableNotFoundException {
+    Scanner scanner = c.createScanner(table2, Authorizations.EMPTY);
+
+    HashMap<String,String> expected = new HashMap<String,String>();
+    expected.put("001:x", "9");
+    expected.put("001:y", "7");
+    expected.put("008:x", "3");
+    expected.put("008:y", "4");
+
+    HashMap<String,String> actual = new HashMap<String,String>();
+
+    for (Entry<Key,Value> entry : scanner)
+      actual.put(entry.getKey().getRowData().toString() + ":" + entry.getKey().getColumnQualifierData().toString(), entry.getValue().toString());
+
+    Assert.assertEquals(expected, actual);
+  }
+
+  private void checkMetadata(String table, Connector conn) throws Exception {
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+    MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(s);
+    String tableId = conn.tableOperations().tableIdMap().get(table);
+
+    Assert.assertNotNull("Could not get table id for " + table, tableId);
+
+    s.setRange(Range.prefix(tableId));
+
+    Key k;
+    Text cf = new Text(), cq = new Text();
+    int itemsInspected = 0;
+    for (Entry<Key,Value> entry : s) {
+      itemsInspected++;
+      k = entry.getKey();
+      k.getColumnFamily(cf);
+      k.getColumnQualifier(cq);
+
+      if (cf.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)) {
+        Path p = new Path(cq.toString());
+        FileSystem fs = cluster.getFileSystem();
+        Assert.assertTrue("File does not exist: " + p, fs.exists(p));
+      } else if (cf.equals(MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnFamily())) {
+        Assert.assertEquals("Saw unexpected cq", MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnQualifier(), cq);
+        Path tabletDir = new Path(entry.getValue().toString());
+        Path tableDir = tabletDir.getParent();
+        Path tablesDir = tableDir.getParent();
+
+        Assert.assertEquals(ServerConstants.TABLE_DIR, tablesDir.getName());
+      } else {
+        Assert.fail("Got unexpected key-value: " + entry);
+        throw new RuntimeException();
+      }
+    }
+
+    Assert.assertTrue("Expected to find metadata entries", itemsInspected > 0);
+  }
+
+  private BatchWriter writeData(String table1, Connector c) throws TableNotFoundException, MutationsRejectedException {
+    BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+
+    Mutation m1 = new Mutation("001");
+    m1.put("data", "x", "9");
+    m1.put("data", "y", "7");
+
+    Mutation m2 = new Mutation("008");
+    m2.put("data", "x", "3");
+    m2.put("data", "y", "4");
+
+    bw.addMutation(m1);
+    bw.addMutation(m2);
+
+    bw.flush();
+    return bw;
+  }
+
+  @Test
+  public void testDeleteClone() throws Exception {
+    String[] tableNames = getUniqueNames(3);
+    String table1 = tableNames[0];
+    String table2 = tableNames[1];
+    String table3 = tableNames[2];
+
+    Connector c = getConnector();
+    AccumuloCluster cluster = getCluster();
+    Assume.assumeTrue(cluster instanceof MiniAccumuloClusterImpl);
+    MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) cluster;
+    String rootPath = mac.getConfig().getDir().getAbsolutePath();
+
+    // verify that deleting a new table removes the files
+    c.tableOperations().create(table3);
+    writeData(table3, c).close();
+    c.tableOperations().flush(table3, null, null, true);
+    // check for files
+    FileSystem fs = getCluster().getFileSystem();
+    String id = c.tableOperations().tableIdMap().get(table3);
+    FileStatus[] status = fs.listStatus(new Path(rootPath + "/accumulo/tables/" + id));
+    assertTrue(status.length > 0);
+    // verify disk usage
+    List<DiskUsage> diskUsage = c.tableOperations().getDiskUsage(Collections.singleton(table3));
+    assertEquals(1, diskUsage.size());
+    assertTrue(diskUsage.get(0).getUsage() > 100);
+    // delete the table
+    c.tableOperations().delete(table3);
+    // verify its gone from the file system
+    Path tablePath = new Path(rootPath + "/accumulo/tables/" + id);
+    if (fs.exists(tablePath)) {
+      status = fs.listStatus(tablePath);
+      assertTrue(status == null || status.length == 0);
+    }
+
+    c.tableOperations().create(table1);
+
+    BatchWriter bw = writeData(table1, c);
+
+    Map<String,String> props = new HashMap<String,String>();
+    props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "500K");
+
+    Set<String> exclude = new HashSet<String>();
+    exclude.add(Property.TABLE_FILE_MAX.getKey());
+
+    c.tableOperations().clone(table1, table2, true, props, exclude);
+
+    Mutation m3 = new Mutation("009");
+    m3.put("data", "x", "1");
+    m3.put("data", "y", "2");
+    bw.addMutation(m3);
+    bw.close();
+
+    // delete source table, should not affect clone
+    c.tableOperations().delete(table1);
+
+    checkData(table2, c);
+
+    c.tableOperations().compact(table2, null, null, true, true);
+
+    checkData(table2, c);
+
+    c.tableOperations().delete(table2);
+
+  }
+
+  @Test
+  public void testCloneWithSplits() throws Exception {
+    Connector conn = getConnector();
+
+    List<Mutation> mutations = new ArrayList<Mutation>();
+    TreeSet<Text> splits = new TreeSet<Text>();
+    for (int i = 0; i < 10; i++) {
+      splits.add(new Text(Integer.toString(i)));
+      Mutation m = new Mutation(Integer.toString(i));
+      m.put("", "", "");
+      mutations.add(m);
+    }
+
+    String[] tables = getUniqueNames(2);
+
+    conn.tableOperations().create(tables[0]);
+
+    conn.tableOperations().addSplits(tables[0], splits);
+
+    BatchWriter bw = conn.createBatchWriter(tables[0], new BatchWriterConfig());
+    bw.addMutations(mutations);
+    bw.close();
+
+    conn.tableOperations().clone(tables[0], tables[1], true, null, null);
+
+    conn.tableOperations().deleteRows(tables[1], new Text("4"), new Text("8"));
+
+    List<String> rows = Arrays.asList("0", "1", "2", "3", "4", "9");
+    List<String> actualRows = new ArrayList<String>();
+    for (Entry<Key,Value> entry : conn.createScanner(tables[1], Authorizations.EMPTY)) {
+      actualRows.add(entry.getKey().getRow().toString());
+    }
+
+    Assert.assertEquals(rows, actualRows);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/CombinerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CombinerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CombinerIT.java
new file mode 100644
index 0000000..d4ef18e
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CombinerIT.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.LongCombiner.Type;
+import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.junit.Test;
+
+public class CombinerIT extends AccumuloClusterHarness {
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 60;
+  }
+
+  private void checkSum(String tableName, Connector c) throws Exception {
+    Scanner s = c.createScanner(tableName, Authorizations.EMPTY);
+    Iterator<Entry<Key,Value>> i = s.iterator();
+    assertTrue(i.hasNext());
+    Entry<Key,Value> entry = i.next();
+    assertEquals("45", entry.getValue().toString());
+    assertFalse(i.hasNext());
+  }
+
+  @Test
+  public void aggregationTest() throws Exception {
+    Connector c = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    IteratorSetting setting = new IteratorSetting(10, SummingCombiner.class);
+    SummingCombiner.setEncodingType(setting, Type.STRING);
+    SummingCombiner.setColumns(setting, Collections.singletonList(new IteratorSetting.Column("cf")));
+    c.tableOperations().attachIterator(tableName, setting);
+    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+    for (int i = 0; i < 10; i++) {
+      Mutation m = new Mutation("row1");
+      m.put("cf".getBytes(), "col1".getBytes(), ("" + i).getBytes());
+      bw.addMutation(m);
+    }
+    bw.close();
+    checkSum(tableName, c);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
new file mode 100644
index 0000000..862365f
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.cli.ClientOpts.Password;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterators;
+
+public class CompactionIT extends AccumuloClusterHarness {
+  private static final Logger log = LoggerFactory.getLogger(CompactionIT.class);
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+    cfg.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN, "4");
+    cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
+    cfg.setProperty(Property.TSERV_MAJC_MAXCONCURRENT, "1");
+    // use raw local file system so walogs sync and flush will work
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 4 * 60;
+  }
+
+  private String majcThreadMaxOpen, majcDelay, majcMaxConcurrent;
+
+  @Before
+  public void alterConfig() throws Exception {
+    if (ClusterType.STANDALONE == getClusterType()) {
+      InstanceOperations iops = getConnector().instanceOperations();
+      Map<String,String> config = iops.getSystemConfiguration();
+      majcThreadMaxOpen = config.get(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey());
+      majcDelay = config.get(Property.TSERV_MAJC_DELAY.getKey());
+      majcMaxConcurrent = config.get(Property.TSERV_MAJC_MAXCONCURRENT.getKey());
+
+      iops.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "4");
+      iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), "1");
+      iops.setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), "1");
+
+      getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+      getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+    }
+  }
+
+  @After
+  public void resetConfig() throws Exception {
+    // We set the values..
+    if (null != majcThreadMaxOpen) {
+      InstanceOperations iops = getConnector().instanceOperations();
+
+      iops.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), majcThreadMaxOpen);
+      iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay);
+      iops.setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), majcMaxConcurrent);
+
+      getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+      getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    final Connector c = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1.0");
+    FileSystem fs = getFileSystem();
+    Path root = new Path(cluster.getTemporaryPath(), getClass().getName());
+    Path testrf = new Path(root, "testrf");
+    FunctionalTestUtils.createRFiles(c, fs, testrf.toString(), 500000, 59, 4);
+
+    FunctionalTestUtils.bulkImport(c, fs, tableName, testrf.toString());
+    int beforeCount = countFiles(c);
+
+    final AtomicBoolean fail = new AtomicBoolean(false);
+    final ClientConfiguration clientConf = cluster.getClientConfig();
+    for (int count = 0; count < 5; count++) {
+      List<Thread> threads = new ArrayList<Thread>();
+      final int span = 500000 / 59;
+      for (int i = 0; i < 500000; i += 500000 / 59) {
+        final int finalI = i;
+        Thread t = new Thread() {
+          @Override
+          public void run() {
+            try {
+              VerifyIngest.Opts opts = new VerifyIngest.Opts();
+              opts.startRow = finalI;
+              opts.rows = span;
+              opts.random = 56;
+              opts.dataSize = 50;
+              opts.cols = 1;
+              opts.setTableName(tableName);
+              if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+                opts.updateKerberosCredentials(clientConf);
+              } else {
+                opts.setPrincipal(getAdminPrincipal());
+                PasswordToken passwordToken = (PasswordToken) getAdminToken();
+                opts.setPassword(new Password(new String(passwordToken.getPassword(), UTF_8)));
+              }
+              VerifyIngest.verifyIngest(c, opts, new ScannerOpts());
+            } catch (Exception ex) {
+              log.warn("Got exception verifying data", ex);
+              fail.set(true);
+            }
+          }
+        };
+        t.start();
+        threads.add(t);
+      }
+      for (Thread t : threads)
+        t.join();
+      assertFalse("Failed to successfully run all threads, Check the test output for error", fail.get());
+    }
+
+    int finalCount = countFiles(c);
+    assertTrue(finalCount < beforeCount);
+    try {
+      getClusterControl().adminStopAll();
+    } finally {
+      // Make sure the internal state in the cluster is reset (e.g. processes in MAC)
+      getCluster().stop();
+      if (ClusterType.STANDALONE == getClusterType()) {
+        // Then restart things for the next test if it's a standalone
+        getCluster().start();
+      }
+    }
+  }
+
+  private int countFiles(Connector c) throws Exception {
+    Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.TabletColumnFamily.NAME);
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+    return Iterators.size(s.iterator());
+  }
+
+}