You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/06/04 20:53:16 UTC
[35/43] accumulo git commit: ACCUMULO-3871 move ITs into distro jar,
stop building test jar
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
new file mode 100644
index 0000000..7c05a0f
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class BatchWriterFlushIT extends AccumuloClusterHarness {
+
+ private static final int NUM_TO_FLUSH = 100000;
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 90;
+ }
+
+ @Test
+ public void run() throws Exception {
+ Connector c = getConnector();
+ String[] tableNames = getUniqueNames(2);
+ String bwft = tableNames[0];
+ c.tableOperations().create(bwft);
+ String bwlt = tableNames[1];
+ c.tableOperations().create(bwlt);
+ runFlushTest(bwft);
+ runLatencyTest(bwlt);
+
+ }
+
+ private void runLatencyTest(String tableName) throws Exception {
+ // should automatically flush after 2 seconds
+ BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig().setMaxLatency(1000, TimeUnit.MILLISECONDS));
+ Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY);
+
+ Mutation m = new Mutation(new Text(String.format("r_%10d", 1)));
+ m.put(new Text("cf"), new Text("cq"), new Value("1".getBytes(UTF_8)));
+ bw.addMutation(m);
+
+ UtilWaitThread.sleep(500);
+
+ int count = Iterators.size(scanner.iterator());
+
+ if (count != 0) {
+ throw new Exception("Flushed too soon");
+ }
+
+ UtilWaitThread.sleep(1500);
+
+ count = Iterators.size(scanner.iterator());
+
+ if (count != 1) {
+ throw new Exception("Did not flush");
+ }
+
+ bw.close();
+ }
+
+ private void runFlushTest(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException,
+ Exception {
+ BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig());
+ Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY);
+ Random r = new Random();
+
+ for (int i = 0; i < 4; i++) {
+ for (int j = 0; j < NUM_TO_FLUSH; j++) {
+ int row = i * NUM_TO_FLUSH + j;
+
+ Mutation m = new Mutation(new Text(String.format("r_%10d", row)));
+ m.put(new Text("cf"), new Text("cq"), new Value(("" + row).getBytes()));
+ bw.addMutation(m);
+ }
+
+ bw.flush();
+
+ // do a few random lookups into the data just flushed
+
+ for (int k = 0; k < 10; k++) {
+ int rowToLookup = r.nextInt(NUM_TO_FLUSH) + i * NUM_TO_FLUSH;
+
+ scanner.setRange(new Range(new Text(String.format("r_%10d", rowToLookup))));
+
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+
+ if (!iter.hasNext())
+ throw new Exception(" row " + rowToLookup + " not found after flush");
+
+ Entry<Key,Value> entry = iter.next();
+
+ if (iter.hasNext())
+ throw new Exception("Scanner returned too much");
+
+ verifyEntry(rowToLookup, entry);
+ }
+
+ // scan all data just flushed
+ scanner.setRange(new Range(new Text(String.format("r_%10d", i * NUM_TO_FLUSH)), true, new Text(String.format("r_%10d", (i + 1) * NUM_TO_FLUSH)), false));
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+
+ for (int j = 0; j < NUM_TO_FLUSH; j++) {
+ int row = i * NUM_TO_FLUSH + j;
+
+ if (!iter.hasNext())
+ throw new Exception("Scan stopped permaturely at " + row);
+
+ Entry<Key,Value> entry = iter.next();
+
+ verifyEntry(row, entry);
+ }
+
+ if (iter.hasNext())
+ throw new Exception("Scanner returned too much");
+
+ }
+
+ bw.close();
+
+ // test adding a mutation to a closed batch writer
+ boolean caught = false;
+ try {
+ bw.addMutation(new Mutation(new Text("foobar")));
+ } catch (IllegalStateException ise) {
+ caught = true;
+ }
+
+ if (!caught) {
+ throw new Exception("Adding to closed batch writer did not fail");
+ }
+ }
+
+ private void verifyEntry(int row, Entry<Key,Value> entry) throws Exception {
+ if (!entry.getKey().getRow().toString().equals(String.format("r_%10d", row))) {
+ throw new Exception("Unexpected key returned, expected " + row + " got " + entry.getKey());
+ }
+
+ if (!entry.getValue().toString().equals("" + row)) {
+ throw new Exception("Unexpected value, expected " + row + " got " + entry.getValue());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BigRootTabletIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BigRootTabletIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BigRootTabletIT.java
new file mode 100644
index 0000000..11dcb66
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BigRootTabletIT.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class BigRootTabletIT extends AccumuloClusterHarness {
+ // ACCUMULO-542: A large root tablet will fail to load if it does't fit in the tserver scan buffers
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ Map<String,String> siteConfig = cfg.getSiteConfig();
+ siteConfig.put(Property.TABLE_SCAN_MAXMEM.getKey(), "1024");
+ siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "60m");
+ cfg.setSiteConfig(siteConfig);
+ }
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 4 * 60;
+ }
+
+ @Test
+ public void test() throws Exception {
+ Connector c = getConnector();
+ c.tableOperations().addSplits(MetadataTable.NAME, FunctionalTestUtils.splits("0 1 2 3 4 5 6 7 8 9 a".split(" ")));
+ String[] names = getUniqueNames(10);
+ for (String name : names) {
+ c.tableOperations().create(name);
+ c.tableOperations().flush(MetadataTable.NAME, null, null, true);
+ c.tableOperations().flush(RootTable.NAME, null, null, true);
+ }
+ cluster.stop();
+ cluster.start();
+ assertTrue(Iterators.size(c.createScanner(RootTable.NAME, Authorizations.EMPTY).iterator()) > 0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BinaryIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BinaryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BinaryIT.java
new file mode 100644
index 0000000..85716d5
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BinaryIT.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.TestBinaryRows;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class BinaryIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 90;
+ }
+
+ @Test
+ public void test() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ runTest(c, tableName);
+ }
+
+ @Test
+ public void testPreSplit() throws Exception {
+ String tableName = getUniqueNames(1)[0];
+ Connector c = getConnector();
+ c.tableOperations().create(tableName);
+ SortedSet<Text> splits = new TreeSet<Text>();
+ splits.add(new Text("8"));
+ splits.add(new Text("256"));
+ c.tableOperations().addSplits(tableName, splits);
+ runTest(c, tableName);
+ }
+
+ public static void runTest(Connector c, String tableName) throws Exception {
+ BatchWriterOpts bwOpts = new BatchWriterOpts();
+ ScannerOpts scanOpts = new ScannerOpts();
+ TestBinaryRows.Opts opts = new TestBinaryRows.Opts();
+ opts.setTableName(tableName);
+ opts.start = 0;
+ opts.num = 100000;
+ opts.mode = "ingest";
+ TestBinaryRows.runTest(c, opts, bwOpts, scanOpts);
+ opts.mode = "verify";
+ TestBinaryRows.runTest(c, opts, bwOpts, scanOpts);
+ opts.start = 25000;
+ opts.num = 50000;
+ opts.mode = "delete";
+ TestBinaryRows.runTest(c, opts, bwOpts, scanOpts);
+ opts.start = 0;
+ opts.num = 25000;
+ opts.mode = "verify";
+ TestBinaryRows.runTest(c, opts, bwOpts, scanOpts);
+ opts.start = 75000;
+ opts.num = 25000;
+ opts.mode = "randomLookups";
+ TestBinaryRows.runTest(c, opts, bwOpts, scanOpts);
+ opts.start = 25000;
+ opts.num = 50000;
+ opts.mode = "verifyDeleted";
+ TestBinaryRows.runTest(c, opts, bwOpts, scanOpts);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BinaryStressIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BinaryStressIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BinaryStressIT.java
new file mode 100644
index 0000000..440d2cf
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BinaryStressIT.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BinaryStressIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 4 * 60;
+ }
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
+ cfg.setProperty(Property.TSERV_MAXMEM, "50K");
+ cfg.setProperty(Property.TSERV_MAJC_DELAY, "0");
+ }
+
+ private String majcDelay, maxMem;
+
+ @Before
+ public void alterConfig() throws Exception {
+ if (ClusterType.MINI == getClusterType()) {
+ return;
+ }
+
+ InstanceOperations iops = getConnector().instanceOperations();
+ Map<String,String> conf = iops.getSystemConfiguration();
+ majcDelay = conf.get(Property.TSERV_MAJC_DELAY.getKey());
+ maxMem = conf.get(Property.TSERV_MAXMEM.getKey());
+
+ iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), "0");
+ iops.setProperty(Property.TSERV_MAXMEM.getKey(), "50K");
+
+ getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ }
+
+ @After
+ public void resetConfig() throws Exception {
+ if (null != majcDelay) {
+ InstanceOperations iops = getConnector().instanceOperations();
+ iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay);
+ iops.setProperty(Property.TSERV_MAXMEM.getKey(), maxMem);
+
+ getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ }
+ }
+
+ @Test
+ public void binaryStressTest() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
+ BinaryIT.runTest(c, tableName);
+ String id = c.tableOperations().tableIdMap().get(tableName);
+ Set<Text> tablets = new HashSet<>();
+ Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(Range.prefix(id));
+ for (Entry<Key,Value> entry : s) {
+ tablets.add(entry.getKey().getRow());
+ }
+ assertTrue("Expected at least 8 tablets, saw " + tablets.size(), tablets.size() > 7);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BloomFilterIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BloomFilterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BloomFilterIT.java
new file mode 100644
index 0000000..fbbe542
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BloomFilterIT.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.keyfunctor.ColumnFamilyFunctor;
+import org.apache.accumulo.core.file.keyfunctor.ColumnQualifierFunctor;
+import org.apache.accumulo.core.file.keyfunctor.RowFunctor;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BloomFilterIT extends AccumuloClusterHarness {
+ private static final Logger log = LoggerFactory.getLogger(BloomFilterIT.class);
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setDefaultMemory(1, MemoryUnit.GIGABYTE);
+ cfg.setNumTservers(1);
+ Map<String,String> siteConfig = cfg.getSiteConfig();
+ siteConfig.put(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "10M");
+ cfg.setSiteConfig(siteConfig);
+ }
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 6 * 60;
+ }
+
+ @Test
+ public void test() throws Exception {
+ Connector c = getConnector();
+ final String readAhead = c.instanceOperations().getSystemConfiguration().get(Property.TSERV_READ_AHEAD_MAXCONCURRENT.getKey());
+ c.instanceOperations().setProperty(Property.TSERV_READ_AHEAD_MAXCONCURRENT.getKey(), "1");
+ try {
+ Thread.sleep(1000);
+ final String[] tables = getUniqueNames(4);
+ for (String table : tables) {
+ TableOperations tops = c.tableOperations();
+ tops.create(table);
+ tops.setProperty(table, Property.TABLE_INDEXCACHE_ENABLED.getKey(), "false");
+ tops.setProperty(table, Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "false");
+ tops.setProperty(table, Property.TABLE_BLOOM_SIZE.getKey(), "2000000");
+ tops.setProperty(table, Property.TABLE_BLOOM_ERRORRATE.getKey(), "1%");
+ tops.setProperty(table, Property.TABLE_BLOOM_LOAD_THRESHOLD.getKey(), "0");
+ tops.setProperty(table, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64K");
+ }
+ log.info("Writing");
+ write(c, tables[0], 1, 0, 2000000000, 500);
+ write(c, tables[1], 2, 0, 2000000000, 500);
+ write(c, tables[2], 3, 0, 2000000000, 500);
+ log.info("Writing complete");
+
+ // test inserting an empty key
+ BatchWriter bw = c.createBatchWriter(tables[3], new BatchWriterConfig());
+ Mutation m = new Mutation(new Text(""));
+ m.put(new Text(""), new Text(""), new Value("foo1".getBytes()));
+ bw.addMutation(m);
+ bw.close();
+ c.tableOperations().flush(tables[3], null, null, true);
+
+ for (String table : Arrays.asList(tables[0], tables[1], tables[2])) {
+ c.tableOperations().compact(table, null, null, true, true);
+ }
+
+ // ensure compactions are finished
+ for (String table : tables) {
+ FunctionalTestUtils.checkRFiles(c, table, 1, 1, 1, 1);
+ }
+
+ // these queries should only run quickly if bloom filters are working, so lets get a base
+ log.info("Base query");
+ long t1 = query(c, tables[0], 1, 0, 2000000000, 5000, 500);
+ long t2 = query(c, tables[1], 2, 0, 2000000000, 5000, 500);
+ long t3 = query(c, tables[2], 3, 0, 2000000000, 5000, 500);
+ log.info("Base query complete");
+
+ log.info("Rewriting with bloom filters");
+ c.tableOperations().setProperty(tables[0], Property.TABLE_BLOOM_ENABLED.getKey(), "true");
+ c.tableOperations().setProperty(tables[0], Property.TABLE_BLOOM_KEY_FUNCTOR.getKey(), RowFunctor.class.getName());
+
+ c.tableOperations().setProperty(tables[1], Property.TABLE_BLOOM_ENABLED.getKey(), "true");
+ c.tableOperations().setProperty(tables[1], Property.TABLE_BLOOM_KEY_FUNCTOR.getKey(), ColumnFamilyFunctor.class.getName());
+
+ c.tableOperations().setProperty(tables[2], Property.TABLE_BLOOM_ENABLED.getKey(), "true");
+ c.tableOperations().setProperty(tables[2], Property.TABLE_BLOOM_KEY_FUNCTOR.getKey(), ColumnQualifierFunctor.class.getName());
+
+ c.tableOperations().setProperty(tables[3], Property.TABLE_BLOOM_ENABLED.getKey(), "true");
+ c.tableOperations().setProperty(tables[3], Property.TABLE_BLOOM_KEY_FUNCTOR.getKey(), RowFunctor.class.getName());
+
+ // ensure the updates to zookeeper propagate
+ UtilWaitThread.sleep(500);
+
+ c.tableOperations().compact(tables[3], null, null, false, true);
+ c.tableOperations().compact(tables[0], null, null, false, true);
+ c.tableOperations().compact(tables[1], null, null, false, true);
+ c.tableOperations().compact(tables[2], null, null, false, true);
+ log.info("Rewriting with bloom filters complete");
+
+ // these queries should only run quickly if bloom
+ // filters are working
+ log.info("Bloom query");
+ long tb1 = query(c, tables[0], 1, 0, 2000000000, 5000, 500);
+ long tb2 = query(c, tables[1], 2, 0, 2000000000, 5000, 500);
+ long tb3 = query(c, tables[2], 3, 0, 2000000000, 5000, 500);
+ log.info("Bloom query complete");
+ timeCheck(t1 + t2 + t3, tb1 + tb2 + tb3);
+
+ // test querying for empty key
+ Scanner scanner = c.createScanner(tables[3], Authorizations.EMPTY);
+ scanner.setRange(new Range(new Text("")));
+
+ if (!scanner.iterator().next().getValue().toString().equals("foo1")) {
+ throw new Exception("Did not see foo1");
+ }
+ } finally {
+ c.instanceOperations().setProperty(Property.TSERV_READ_AHEAD_MAXCONCURRENT.getKey(), readAhead);
+ }
+ }
+
+ private void timeCheck(long t1, long t2) throws Exception {
+ double improvement = (t1 - t2) * 1.0 / t1;
+ if (improvement < .1) {
+ throw new Exception("Queries had less than 10% improvement (old: " + t1 + " new: " + t2 + " improvement: " + (improvement * 100) + "%)");
+ }
+ log.info(String.format("Improvement: %.2f%% (%d vs %d)", (improvement * 100), t1, t2));
+ }
+
+ private long query(Connector c, String table, int depth, long start, long end, int num, int step) throws Exception {
+ Random r = new Random(42);
+
+ HashSet<Long> expected = new HashSet<Long>();
+ List<Range> ranges = new ArrayList<Range>(num);
+ Text key = new Text();
+ Text row = new Text("row"), cq = new Text("cq"), cf = new Text("cf");
+
+ for (int i = 0; i < num; ++i) {
+ Long k = ((r.nextLong() & 0x7fffffffffffffffl) % (end - start)) + start;
+ key.set(String.format("k_%010d", k));
+ Range range = null;
+ Key acuKey;
+
+ if (k % (start + step) == 0) {
+ expected.add(k);
+ }
+
+ switch (depth) {
+ case 1:
+ range = new Range(new Text(key));
+ break;
+ case 2:
+ acuKey = new Key(row, key, cq);
+ range = new Range(acuKey, true, acuKey.followingKey(PartialKey.ROW_COLFAM), false);
+ break;
+ case 3:
+ acuKey = new Key(row, cf, key);
+ range = new Range(acuKey, true, acuKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL), false);
+ break;
+ }
+
+ ranges.add(range);
+ }
+
+ BatchScanner bs = c.createBatchScanner(table, Authorizations.EMPTY, 1);
+ bs.setRanges(ranges);
+
+ long t1 = System.currentTimeMillis();
+ for (Entry<Key,Value> entry : bs) {
+ long v = Long.parseLong(entry.getValue().toString());
+ if (!expected.remove(v)) {
+ throw new Exception("Got unexpected return " + entry.getKey() + " " + entry.getValue());
+ }
+ }
+ long t2 = System.currentTimeMillis();
+
+ if (expected.size() > 0) {
+ throw new Exception("Did not get all expected values " + expected.size());
+ }
+
+ bs.close();
+
+ return t2 - t1;
+ }
+
+ private void write(Connector c, String table, int depth, long start, long end, int step) throws Exception {
+
+ BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+
+ for (long i = start; i < end; i += step) {
+ String key = String.format("k_%010d", i);
+
+ Mutation m = null;
+
+ switch (depth) {
+ case 1:
+ m = new Mutation(new Text(key));
+ m.put(new Text("cf"), new Text("cq"), new Value(("" + i).getBytes()));
+ break;
+ case 2:
+ m = new Mutation(new Text("row"));
+ m.put(new Text(key), new Text("cq"), new Value(("" + i).getBytes()));
+ break;
+ case 3:
+ m = new Mutation(new Text("row"));
+ m.put(new Text("cf"), new Text(key), new Value(("" + i).getBytes()));
+ break;
+ }
+
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ c.tableOperations().flush(table, null, null, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
new file mode 100644
index 0000000..1abafeb
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class BulkFileIT extends AccumuloClusterHarness {
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf) {
+ cfg.setMemory(ServerType.TABLET_SERVER, 128 * 4, MemoryUnit.MEGABYTE);
+ }
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 4 * 60;
+ }
+
+ @Test
+ public void testBulkFile() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ SortedSet<Text> splits = new TreeSet<Text>();
+ for (String split : "0333 0666 0999 1333 1666".split(" "))
+ splits.add(new Text(split));
+ c.tableOperations().addSplits(tableName, splits);
+ Configuration conf = new Configuration();
+ AccumuloConfiguration aconf = new ServerConfigurationFactory(c.getInstance()).getConfiguration();
+ FileSystem fs = getCluster().getFileSystem();
+
+ String rootPath = cluster.getTemporaryPath().toString();
+
+ String dir = rootPath + "/bulk_test_diff_files_89723987592_" + getUniqueNames(1)[0];
+
+ fs.delete(new Path(dir), true);
+
+ FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + RFile.EXTENSION, fs, conf, aconf);
+ writer1.startDefaultLocalityGroup();
+ writeData(writer1, 0, 333);
+ writer1.close();
+
+ FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + RFile.EXTENSION, fs, conf, aconf);
+ writer2.startDefaultLocalityGroup();
+ writeData(writer2, 334, 999);
+ writer2.close();
+
+ FileSKVWriter writer3 = FileOperations.getInstance().openWriter(dir + "/f3." + RFile.EXTENSION, fs, conf, aconf);
+ writer3.startDefaultLocalityGroup();
+ writeData(writer3, 1000, 1999);
+ writer3.close();
+
+ FunctionalTestUtils.bulkImport(c, fs, tableName, dir);
+
+ FunctionalTestUtils.checkRFiles(c, tableName, 6, 6, 1, 1);
+
+ verifyData(tableName, 0, 1999);
+
+ }
+
+ private void verifyData(String table, int s, int e) throws Exception {
+ Scanner scanner = getConnector().createScanner(table, Authorizations.EMPTY);
+
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+
+ for (int i = s; i <= e; i++) {
+ if (!iter.hasNext())
+ throw new Exception("row " + i + " not found");
+
+ Entry<Key,Value> entry = iter.next();
+
+ String row = String.format("%04d", i);
+
+ if (!entry.getKey().getRow().equals(new Text(row)))
+ throw new Exception("unexpected row " + entry.getKey() + " " + i);
+
+ if (Integer.parseInt(entry.getValue().toString()) != i)
+ throw new Exception("unexpected value " + entry + " " + i);
+ }
+
+ if (iter.hasNext())
+ throw new Exception("found more than expected " + iter.next());
+ }
+
+ private void writeData(FileSKVWriter w, int s, int e) throws Exception {
+ for (int i = s; i <= e; i++) {
+ w.append(new Key(new Text(String.format("%04d", i))), new Value(Integer.toString(i).getBytes(UTF_8)));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
new file mode 100644
index 0000000..f60724e
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.TestIngest.Opts;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BulkIT extends AccumuloClusterHarness {
+
+ private static final int N = 100000;
+ private static final int COUNT = 5;
+ private static final BatchWriterOpts BWOPTS = new BatchWriterOpts();
+ private static final ScannerOpts SOPTS = new ScannerOpts();
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 4 * 60;
+ }
+
+ private Configuration origConf;
+
+ @Before
+ public void saveConf() {
+ origConf = CachedConfiguration.getInstance();
+ }
+
+ @After
+ public void restoreConf() {
+ if (null != origConf) {
+ CachedConfiguration.setInstance(origConf);
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ runTest(getConnector(), getCluster().getFileSystem(), getCluster().getTemporaryPath(), getAdminPrincipal(), getUniqueNames(1)[0],
+ this.getClass().getName(), testName.getMethodName());
+ }
+
+ static void runTest(Connector c, FileSystem fs, Path basePath, String principal, String tableName, String filePrefix, String dirSuffix) throws Exception {
+ c.tableOperations().create(tableName);
+ CachedConfiguration.setInstance(fs.getConf());
+
+ Path base = new Path(basePath, "testBulkFail_" + dirSuffix);
+ fs.delete(base, true);
+ fs.mkdirs(base);
+ Path bulkFailures = new Path(base, "failures");
+ Path files = new Path(base, "files");
+ fs.mkdirs(bulkFailures);
+ fs.mkdirs(files);
+
+ Opts opts = new Opts();
+ opts.timestamp = 1;
+ opts.random = 56;
+ opts.rows = N;
+ opts.instance = c.getInstance().getInstanceName();
+ opts.cols = 1;
+ opts.setTableName(tableName);
+ opts.conf = CachedConfiguration.getInstance();
+ opts.fs = fs;
+ String fileFormat = filePrefix + "rf%02d";
+ for (int i = 0; i < COUNT; i++) {
+ opts.outputFile = new Path(files, String.format(fileFormat, i)).toString();
+ opts.startRow = N * i;
+ TestIngest.ingest(c, opts, BWOPTS);
+ }
+ opts.outputFile = base + String.format(fileFormat, N);
+ opts.startRow = N;
+ opts.rows = 1;
+ // create an rfile with one entry, there was a bug with this:
+ TestIngest.ingest(c, opts, BWOPTS);
+
+ // Make sure the server can modify the files
+ FsShell fsShell = new FsShell(fs.getConf());
+ Assert.assertEquals("Failed to chmod " + base.toString(), 0, fsShell.run(new String[] {"-chmod", "-R", "777", base.toString()}));
+
+ c.tableOperations().importDirectory(tableName, files.toString(), bulkFailures.toString(), false);
+ VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+ vopts.setTableName(tableName);
+ vopts.random = 56;
+ vopts.setPrincipal(principal);
+ for (int i = 0; i < COUNT; i++) {
+ vopts.startRow = i * N;
+ vopts.rows = N;
+ VerifyIngest.verifyIngest(c, vopts, SOPTS);
+ }
+ vopts.startRow = N;
+ vopts.rows = 1;
+ VerifyIngest.verifyIngest(c, vopts, SOPTS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
new file mode 100644
index 0000000..74d3e96
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import org.apache.accumulo.core.cli.ClientOpts.Password;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test verifies that when a lot of files are bulk imported into a table with one tablet and then splits that not all map files go to the children tablets.
+ */
+
+public class BulkSplitOptimizationIT extends AccumuloClusterHarness {
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.TSERV_MAJC_DELAY, "1s");
+ }
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ private String majcDelay;
+
+ @Before
+ public void alterConfig() throws Exception {
+ Connector conn = getConnector();
+ majcDelay = conn.instanceOperations().getSystemConfiguration().get(Property.TSERV_MAJC_DELAY.getKey());
+ if (!"1s".equals(majcDelay)) {
+ conn.instanceOperations().setProperty(Property.TSERV_MAJC_DELAY.getKey(), "1s");
+ getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ }
+ }
+
+ @After
+ public void resetConfig() throws Exception {
+ if (null != majcDelay) {
+ Connector conn = getConnector();
+ conn.instanceOperations().setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay);
+ getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ }
+ }
+
+ static final int ROWS = 100000;
+ static final int SPLITS = 99;
+
+ @Test
+ public void testBulkSplitOptimization() throws Exception {
+ final Connector c = getConnector();
+ final String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1000");
+ c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "1000");
+ c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G");
+
+ FileSystem fs = getFileSystem();
+ Path testDir = new Path(getUsableDir(), "testmf");
+ FunctionalTestUtils.createRFiles(c, fs, testDir.toString(), ROWS, SPLITS, 8);
+ FileStatus[] stats = fs.listStatus(testDir);
+
+ System.out.println("Number of generated files: " + stats.length);
+ FunctionalTestUtils.bulkImport(c, fs, tableName, testDir.toString());
+ FunctionalTestUtils.checkSplits(c, tableName, 0, 0);
+ FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100);
+
+ // initiate splits
+ getConnector().tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "100K");
+
+ UtilWaitThread.sleep(2000);
+
+ // wait until over split threshold -- should be 78 splits
+ while (getConnector().tableOperations().listSplits(tableName).size() < 75) {
+ UtilWaitThread.sleep(500);
+ }
+
+ FunctionalTestUtils.checkSplits(c, tableName, 50, 100);
+ VerifyIngest.Opts opts = new VerifyIngest.Opts();
+ opts.timestamp = 1;
+ opts.dataSize = 50;
+ opts.random = 56;
+ opts.rows = 100000;
+ opts.startRow = 0;
+ opts.cols = 1;
+ opts.setTableName(tableName);
+
+ AuthenticationToken adminToken = getAdminToken();
+ if (adminToken instanceof PasswordToken) {
+ PasswordToken token = (PasswordToken) getAdminToken();
+ opts.setPassword(new Password(new String(token.getPassword(), UTF_8)));
+ opts.setPrincipal(getAdminPrincipal());
+ } else if (adminToken instanceof KerberosToken) {
+ ClientConfiguration clientConf = cluster.getClientConfig();
+ opts.updateKerberosCredentials(clientConf);
+ } else {
+ Assert.fail("Unknown token type");
+ }
+
+ VerifyIngest.verifyIngest(c, opts, new ScannerOpts());
+
+ // ensure each tablet does not have all map files, should be ~2.5 files per tablet
+ FunctionalTestUtils.checkRFiles(c, tableName, 50, 100, 1, 4);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java
new file mode 100644
index 0000000..4055c3a
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ChaoticBalancerIT.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.master.balancer.ChaoticLoadBalancer;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class ChaoticBalancerIT extends AccumuloClusterHarness {
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ Map<String,String> siteConfig = cfg.getSiteConfig();
+ siteConfig.put(Property.TSERV_MAXMEM.getKey(), "10K");
+ siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "0");
+ cfg.setSiteConfig(siteConfig);
+ }
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 4 * 60;
+ }
+
+ @Test
+ public void test() throws Exception {
+ Connector c = getConnector();
+ String[] names = getUniqueNames(2);
+ String tableName = names[0], unused = names[1];
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_LOAD_BALANCER.getKey(), ChaoticLoadBalancer.class.getName());
+ c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
+ SortedSet<Text> splits = new TreeSet<Text>();
+ for (int i = 0; i < 100; i++) {
+ splits.add(new Text(String.format("%03d", i)));
+ }
+ c.tableOperations().create(unused);
+ c.tableOperations().addSplits(unused, splits);
+ TestIngest.Opts opts = new TestIngest.Opts();
+ VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+ vopts.rows = opts.rows = 20000;
+ opts.setTableName(tableName);
+ vopts.setTableName(tableName);
+ ClientConfiguration clientConfig = getCluster().getClientConfig();
+ if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ opts.updateKerberosCredentials(clientConfig);
+ vopts.updateKerberosCredentials(clientConfig);
+ } else {
+ opts.setPrincipal(getAdminPrincipal());
+ vopts.setPrincipal(getAdminPrincipal());
+ }
+ TestIngest.ingest(c, opts, new BatchWriterOpts());
+ c.tableOperations().flush(tableName, null, null, true);
+ VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java
new file mode 100644
index 0000000..c06feed
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ClassLoaderIT.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Combiner;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ClassLoaderIT extends AccumuloClusterHarness {
+
+ private static final long ZOOKEEPER_PROPAGATION_TIME = 10 * 1000;
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ private String rootPath;
+
+ @Before
+ public void checkCluster() {
+ Assume.assumeThat(getClusterType(), CoreMatchers.is(ClusterType.MINI));
+ MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) getCluster();
+ rootPath = mac.getConfig().getDir().getAbsolutePath();
+ }
+
+ @Test
+ public void test() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("row1");
+ m.put("cf", "col1", "Test");
+ bw.addMutation(m);
+ bw.close();
+ scanCheck(c, tableName, "Test");
+ FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+ Path jarPath = new Path(rootPath + "/lib/ext/Test.jar");
+ fs.copyFromLocalFile(new Path(System.getProperty("user.dir") + "/src/test/resources/TestCombinerX.jar"), jarPath);
+ UtilWaitThread.sleep(1000);
+ IteratorSetting is = new IteratorSetting(10, "TestCombiner", "org.apache.accumulo.test.functional.TestCombiner");
+ Combiner.setColumns(is, Collections.singletonList(new IteratorSetting.Column("cf")));
+ c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.scan));
+ UtilWaitThread.sleep(ZOOKEEPER_PROPAGATION_TIME);
+ scanCheck(c, tableName, "TestX");
+ fs.delete(jarPath, true);
+ fs.copyFromLocalFile(new Path(System.getProperty("user.dir") + "/src/test/resources/TestCombinerY.jar"), jarPath);
+ UtilWaitThread.sleep(5000);
+ scanCheck(c, tableName, "TestY");
+ fs.delete(jarPath, true);
+ }
+
+ private void scanCheck(Connector c, String tableName, String expected) throws Exception {
+ Scanner bs = c.createScanner(tableName, Authorizations.EMPTY);
+ Iterator<Entry<Key,Value>> iterator = bs.iterator();
+ assertTrue(iterator.hasNext());
+ Entry<Key,Value> next = iterator.next();
+ assertFalse(iterator.hasNext());
+ assertEquals(expected, next.getValue().toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/CleanTmpIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CleanTmpIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CleanTmpIT.java
new file mode 100644
index 0000000..779b407
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CleanTmpIT.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+
+public class CleanTmpIT extends ConfigurableMacBase {
+ private static final Logger log = LoggerFactory.getLogger(CleanTmpIT.class);
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
+ cfg.setNumTservers(1);
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 4 * 60;
+ }
+
+ @Test
+ public void test() throws Exception {
+ Connector c = getConnector();
+ // make a table
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ // write to it
+ BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("row");
+ m.put("cf", "cq", "value");
+ bw.addMutation(m);
+ bw.flush();
+
+ // Compact memory to make a file
+ c.tableOperations().compact(tableName, null, null, true, true);
+
+ // Make sure that we'll have a WAL
+ m = new Mutation("row2");
+ m.put("cf", "cq", "value");
+ bw.addMutation(m);
+ bw.close();
+
+ // create a fake _tmp file in its directory
+ String id = c.tableOperations().tableIdMap().get(tableName);
+ Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(Range.prefix(id));
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+ Path file = new Path(entry.getKey().getColumnQualifier().toString());
+
+ FileSystem fs = getCluster().getFileSystem();
+ assertTrue("Could not find file: " + file, fs.exists(file));
+ Path tabletDir = file.getParent();
+ assertNotNull("Tablet dir should not be null", tabletDir);
+ Path tmp = new Path(tabletDir, "junk.rf_tmp");
+ // Make the file
+ fs.create(tmp).close();
+ log.info("Created tmp file {}", tmp.toString());
+ getCluster().stop();
+ getCluster().start();
+
+ Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
+ assertEquals(2, Iterators.size(scanner.iterator()));
+ // If we performed log recovery, we should have cleaned up any stray files
+ assertFalse("File still exists: " + tmp, fs.exists(tmp));
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/CleanUpIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CleanUpIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CleanUpIT.java
new file mode 100644
index 0000000..1f6d1a0
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CleanUpIT.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CleanUp;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Ensures that all threads spawned for ZooKeeper and Thrift connectivity are reaped after calling CleanUp.shutdown().
+ *
+ * Because this is destructive across the current context classloader, the normal teardown methods will fail (because they attempt to create a Connector). Until
+ * the ZooKeeperInstance and Connector are self-contained WRT resource management, we can't leverage the AccumuloClusterBase.
+ */
+public class CleanUpIT extends SharedMiniClusterBase {
+ private static final Logger log = LoggerFactory.getLogger(CleanUpIT.class);
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 30;
+ }
+
+ @Test
+ public void run() throws Exception {
+
+ String tableName = getUniqueNames(1)[0];
+ getConnector().tableOperations().create(tableName);
+
+ BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig());
+
+ Mutation m1 = new Mutation("r1");
+ m1.put("cf1", "cq1", 1, "5");
+
+ bw.addMutation(m1);
+
+ bw.flush();
+
+ Scanner scanner = getConnector().createScanner(tableName, new Authorizations());
+
+ int count = 0;
+ for (Entry<Key,Value> entry : scanner) {
+ count++;
+ if (!entry.getValue().toString().equals("5")) {
+ Assert.fail("Unexpected value " + entry.getValue());
+ }
+ }
+
+ Assert.assertEquals("Unexpected count", 1, count);
+
+ int threadCount = countThreads();
+ if (threadCount < 2) {
+ printThreadNames();
+ Assert.fail("Not seeing expected threads. Saw " + threadCount);
+ }
+
+ CleanUp.shutdownNow();
+
+ Mutation m2 = new Mutation("r2");
+ m2.put("cf1", "cq1", 1, "6");
+
+ try {
+ bw.addMutation(m1);
+ bw.flush();
+ Assert.fail("batch writer did not fail");
+ } catch (Exception e) {
+
+ }
+
+ try {
+ // expect this to fail also, want to clean up batch writer threads
+ bw.close();
+ Assert.fail("batch writer close not fail");
+ } catch (Exception e) {
+
+ }
+
+ try {
+ count = 0;
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+ while (iter.hasNext()) {
+ iter.next();
+ count++;
+ }
+ Assert.fail("scanner did not fail");
+ } catch (Exception e) {
+
+ }
+
+ threadCount = countThreads();
+ if (threadCount > 0) {
+ printThreadNames();
+ Assert.fail("Threads did not go away. Saw " + threadCount);
+ }
+ }
+
+ private void printThreadNames() {
+ Set<Thread> threads = Thread.getAllStackTraces().keySet();
+ Exception e = new Exception();
+ for (Thread thread : threads) {
+ e.setStackTrace(thread.getStackTrace());
+ log.info("thread name: " + thread.getName(), e);
+ }
+ }
+
+ /**
+ * count threads that should be cleaned up
+ *
+ */
+ private int countThreads() {
+ int count = 0;
+ Set<Thread> threads = Thread.getAllStackTraces().keySet();
+ for (Thread thread : threads) {
+
+ if (thread.getName().toLowerCase().contains("sendthread") || thread.getName().toLowerCase().contains("eventthread"))
+ count++;
+
+ if (thread.getName().toLowerCase().contains("thrift") && thread.getName().toLowerCase().contains("pool"))
+ count++;
+ }
+
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/CloneTestIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CloneTestIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CloneTestIT.java
new file mode 100644
index 0000000..b3d0ab5
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CloneTestIT.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.cluster.AccumuloCluster;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.DiskUsage;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class CloneTestIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ @Test
+ public void testProps() throws Exception {
+ String[] tableNames = getUniqueNames(2);
+ String table1 = tableNames[0];
+ String table2 = tableNames[1];
+
+ Connector c = getConnector();
+
+ c.tableOperations().create(table1);
+
+ c.tableOperations().setProperty(table1, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1M");
+ c.tableOperations().setProperty(table1, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), "2M");
+ c.tableOperations().setProperty(table1, Property.TABLE_FILE_MAX.getKey(), "23");
+
+ BatchWriter bw = writeData(table1, c);
+
+ Map<String,String> props = new HashMap<String,String>();
+ props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "500K");
+
+ Set<String> exclude = new HashSet<String>();
+ exclude.add(Property.TABLE_FILE_MAX.getKey());
+
+ c.tableOperations().clone(table1, table2, true, props, exclude);
+
+ Mutation m3 = new Mutation("009");
+ m3.put("data", "x", "1");
+ m3.put("data", "y", "2");
+ bw.addMutation(m3);
+ bw.close();
+
+ checkData(table2, c);
+
+ checkMetadata(table2, c);
+
+ HashMap<String,String> tableProps = new HashMap<String,String>();
+ for (Entry<String,String> prop : c.tableOperations().getProperties(table2)) {
+ tableProps.put(prop.getKey(), prop.getValue());
+ }
+
+ Assert.assertEquals("500K", tableProps.get(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey()));
+ Assert.assertEquals(Property.TABLE_FILE_MAX.getDefaultValue(), tableProps.get(Property.TABLE_FILE_MAX.getKey()));
+ Assert.assertEquals("2M", tableProps.get(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey()));
+
+ c.tableOperations().delete(table1);
+ c.tableOperations().delete(table2);
+
+ }
+
+ private void checkData(String table2, Connector c) throws TableNotFoundException {
+ Scanner scanner = c.createScanner(table2, Authorizations.EMPTY);
+
+ HashMap<String,String> expected = new HashMap<String,String>();
+ expected.put("001:x", "9");
+ expected.put("001:y", "7");
+ expected.put("008:x", "3");
+ expected.put("008:y", "4");
+
+ HashMap<String,String> actual = new HashMap<String,String>();
+
+ for (Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey().getRowData().toString() + ":" + entry.getKey().getColumnQualifierData().toString(), entry.getValue().toString());
+
+ Assert.assertEquals(expected, actual);
+ }
+
+ private void checkMetadata(String table, Connector conn) throws Exception {
+ Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(s);
+ String tableId = conn.tableOperations().tableIdMap().get(table);
+
+ Assert.assertNotNull("Could not get table id for " + table, tableId);
+
+ s.setRange(Range.prefix(tableId));
+
+ Key k;
+ Text cf = new Text(), cq = new Text();
+ int itemsInspected = 0;
+ for (Entry<Key,Value> entry : s) {
+ itemsInspected++;
+ k = entry.getKey();
+ k.getColumnFamily(cf);
+ k.getColumnQualifier(cq);
+
+ if (cf.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)) {
+ Path p = new Path(cq.toString());
+ FileSystem fs = cluster.getFileSystem();
+ Assert.assertTrue("File does not exist: " + p, fs.exists(p));
+ } else if (cf.equals(MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnFamily())) {
+ Assert.assertEquals("Saw unexpected cq", MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.getColumnQualifier(), cq);
+ Path tabletDir = new Path(entry.getValue().toString());
+ Path tableDir = tabletDir.getParent();
+ Path tablesDir = tableDir.getParent();
+
+ Assert.assertEquals(ServerConstants.TABLE_DIR, tablesDir.getName());
+ } else {
+ Assert.fail("Got unexpected key-value: " + entry);
+ throw new RuntimeException();
+ }
+ }
+
+ Assert.assertTrue("Expected to find metadata entries", itemsInspected > 0);
+ }
+
+ private BatchWriter writeData(String table1, Connector c) throws TableNotFoundException, MutationsRejectedException {
+ BatchWriter bw = c.createBatchWriter(table1, new BatchWriterConfig());
+
+ Mutation m1 = new Mutation("001");
+ m1.put("data", "x", "9");
+ m1.put("data", "y", "7");
+
+ Mutation m2 = new Mutation("008");
+ m2.put("data", "x", "3");
+ m2.put("data", "y", "4");
+
+ bw.addMutation(m1);
+ bw.addMutation(m2);
+
+ bw.flush();
+ return bw;
+ }
+
+ @Test
+ public void testDeleteClone() throws Exception {
+ String[] tableNames = getUniqueNames(3);
+ String table1 = tableNames[0];
+ String table2 = tableNames[1];
+ String table3 = tableNames[2];
+
+ Connector c = getConnector();
+ AccumuloCluster cluster = getCluster();
+ Assume.assumeTrue(cluster instanceof MiniAccumuloClusterImpl);
+ MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) cluster;
+ String rootPath = mac.getConfig().getDir().getAbsolutePath();
+
+ // verify that deleting a new table removes the files
+ c.tableOperations().create(table3);
+ writeData(table3, c).close();
+ c.tableOperations().flush(table3, null, null, true);
+ // check for files
+ FileSystem fs = getCluster().getFileSystem();
+ String id = c.tableOperations().tableIdMap().get(table3);
+ FileStatus[] status = fs.listStatus(new Path(rootPath + "/accumulo/tables/" + id));
+ assertTrue(status.length > 0);
+ // verify disk usage
+ List<DiskUsage> diskUsage = c.tableOperations().getDiskUsage(Collections.singleton(table3));
+ assertEquals(1, diskUsage.size());
+ assertTrue(diskUsage.get(0).getUsage() > 100);
+ // delete the table
+ c.tableOperations().delete(table3);
+ // verify its gone from the file system
+ Path tablePath = new Path(rootPath + "/accumulo/tables/" + id);
+ if (fs.exists(tablePath)) {
+ status = fs.listStatus(tablePath);
+ assertTrue(status == null || status.length == 0);
+ }
+
+ c.tableOperations().create(table1);
+
+ BatchWriter bw = writeData(table1, c);
+
+ Map<String,String> props = new HashMap<String,String>();
+ props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "500K");
+
+ Set<String> exclude = new HashSet<String>();
+ exclude.add(Property.TABLE_FILE_MAX.getKey());
+
+ c.tableOperations().clone(table1, table2, true, props, exclude);
+
+ Mutation m3 = new Mutation("009");
+ m3.put("data", "x", "1");
+ m3.put("data", "y", "2");
+ bw.addMutation(m3);
+ bw.close();
+
+ // delete source table, should not affect clone
+ c.tableOperations().delete(table1);
+
+ checkData(table2, c);
+
+ c.tableOperations().compact(table2, null, null, true, true);
+
+ checkData(table2, c);
+
+ c.tableOperations().delete(table2);
+
+ }
+
+ @Test
+ public void testCloneWithSplits() throws Exception {
+ Connector conn = getConnector();
+
+ List<Mutation> mutations = new ArrayList<Mutation>();
+ TreeSet<Text> splits = new TreeSet<Text>();
+ for (int i = 0; i < 10; i++) {
+ splits.add(new Text(Integer.toString(i)));
+ Mutation m = new Mutation(Integer.toString(i));
+ m.put("", "", "");
+ mutations.add(m);
+ }
+
+ String[] tables = getUniqueNames(2);
+
+ conn.tableOperations().create(tables[0]);
+
+ conn.tableOperations().addSplits(tables[0], splits);
+
+ BatchWriter bw = conn.createBatchWriter(tables[0], new BatchWriterConfig());
+ bw.addMutations(mutations);
+ bw.close();
+
+ conn.tableOperations().clone(tables[0], tables[1], true, null, null);
+
+ conn.tableOperations().deleteRows(tables[1], new Text("4"), new Text("8"));
+
+ List<String> rows = Arrays.asList("0", "1", "2", "3", "4", "9");
+ List<String> actualRows = new ArrayList<String>();
+ for (Entry<Key,Value> entry : conn.createScanner(tables[1], Authorizations.EMPTY)) {
+ actualRows.add(entry.getKey().getRow().toString());
+ }
+
+ Assert.assertEquals(rows, actualRows);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/CombinerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CombinerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CombinerIT.java
new file mode 100644
index 0000000..d4ef18e
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CombinerIT.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.LongCombiner.Type;
+import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.junit.Test;
+
+public class CombinerIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ private void checkSum(String tableName, Connector c) throws Exception {
+ Scanner s = c.createScanner(tableName, Authorizations.EMPTY);
+ Iterator<Entry<Key,Value>> i = s.iterator();
+ assertTrue(i.hasNext());
+ Entry<Key,Value> entry = i.next();
+ assertEquals("45", entry.getValue().toString());
+ assertFalse(i.hasNext());
+ }
+
+ @Test
+ public void aggregationTest() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ IteratorSetting setting = new IteratorSetting(10, SummingCombiner.class);
+ SummingCombiner.setEncodingType(setting, Type.STRING);
+ SummingCombiner.setColumns(setting, Collections.singletonList(new IteratorSetting.Column("cf")));
+ c.tableOperations().attachIterator(tableName, setting);
+ BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+ for (int i = 0; i < 10; i++) {
+ Mutation m = new Mutation("row1");
+ m.put("cf".getBytes(), "col1".getBytes(), ("" + i).getBytes());
+ bw.addMutation(m);
+ }
+ bw.close();
+ checkSum(tableName, c);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
new file mode 100644
index 0000000..862365f
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.cli.ClientOpts.Password;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterators;
+
+public class CompactionIT extends AccumuloClusterHarness {
+ private static final Logger log = LoggerFactory.getLogger(CompactionIT.class);
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+ cfg.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN, "4");
+ cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
+ cfg.setProperty(Property.TSERV_MAJC_MAXCONCURRENT, "1");
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 4 * 60;
+ }
+
+ private String majcThreadMaxOpen, majcDelay, majcMaxConcurrent;
+
+ @Before
+ public void alterConfig() throws Exception {
+ if (ClusterType.STANDALONE == getClusterType()) {
+ InstanceOperations iops = getConnector().instanceOperations();
+ Map<String,String> config = iops.getSystemConfiguration();
+ majcThreadMaxOpen = config.get(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey());
+ majcDelay = config.get(Property.TSERV_MAJC_DELAY.getKey());
+ majcMaxConcurrent = config.get(Property.TSERV_MAJC_MAXCONCURRENT.getKey());
+
+ iops.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "4");
+ iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), "1");
+ iops.setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), "1");
+
+ getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ }
+ }
+
+ @After
+ public void resetConfig() throws Exception {
+ // We set the values..
+ if (null != majcThreadMaxOpen) {
+ InstanceOperations iops = getConnector().instanceOperations();
+
+ iops.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), majcThreadMaxOpen);
+ iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay);
+ iops.setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), majcMaxConcurrent);
+
+ getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ }
+ }
+
+ @Test
+ public void test() throws Exception {
+ final Connector c = getConnector();
+ final String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1.0");
+ FileSystem fs = getFileSystem();
+ Path root = new Path(cluster.getTemporaryPath(), getClass().getName());
+ Path testrf = new Path(root, "testrf");
+ FunctionalTestUtils.createRFiles(c, fs, testrf.toString(), 500000, 59, 4);
+
+ FunctionalTestUtils.bulkImport(c, fs, tableName, testrf.toString());
+ int beforeCount = countFiles(c);
+
+ final AtomicBoolean fail = new AtomicBoolean(false);
+ final ClientConfiguration clientConf = cluster.getClientConfig();
+ for (int count = 0; count < 5; count++) {
+ List<Thread> threads = new ArrayList<Thread>();
+ final int span = 500000 / 59;
+ for (int i = 0; i < 500000; i += 500000 / 59) {
+ final int finalI = i;
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ VerifyIngest.Opts opts = new VerifyIngest.Opts();
+ opts.startRow = finalI;
+ opts.rows = span;
+ opts.random = 56;
+ opts.dataSize = 50;
+ opts.cols = 1;
+ opts.setTableName(tableName);
+ if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ opts.updateKerberosCredentials(clientConf);
+ } else {
+ opts.setPrincipal(getAdminPrincipal());
+ PasswordToken passwordToken = (PasswordToken) getAdminToken();
+ opts.setPassword(new Password(new String(passwordToken.getPassword(), UTF_8)));
+ }
+ VerifyIngest.verifyIngest(c, opts, new ScannerOpts());
+ } catch (Exception ex) {
+ log.warn("Got exception verifying data", ex);
+ fail.set(true);
+ }
+ }
+ };
+ t.start();
+ threads.add(t);
+ }
+ for (Thread t : threads)
+ t.join();
+ assertFalse("Failed to successfully run all threads, Check the test output for error", fail.get());
+ }
+
+ int finalCount = countFiles(c);
+ assertTrue(finalCount < beforeCount);
+ try {
+ getClusterControl().adminStopAll();
+ } finally {
+ // Make sure the internal state in the cluster is reset (e.g. processes in MAC)
+ getCluster().stop();
+ if (ClusterType.STANDALONE == getClusterType()) {
+ // Then restart things for the next test if it's a standalone
+ getCluster().start();
+ }
+ }
+ }
+
+ private int countFiles(Connector c) throws Exception {
+ Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.TabletColumnFamily.NAME);
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ return Iterators.size(s.iterator());
+ }
+
+}