You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/06/04 20:53:18 UTC
[37/43] accumulo git commit: ACCUMULO-3871 move ITs into distro jar,
stop building test jar
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/SplitRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/SplitRecoveryIT.java
new file mode 100644
index 0000000..298c761
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/SplitRecoveryIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class SplitRecoveryIT extends AccumuloClusterHarness {
+
+ private Mutation m(String row) {
+ Mutation result = new Mutation(row);
+ result.put("cf", "cq", new Value("value".getBytes()));
+ return result;
+ }
+
+ boolean isOffline(String tablename, Connector connector) throws TableNotFoundException {
+ String tableId = connector.tableOperations().tableIdMap().get(tablename);
+ Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ scanner.setRange(new Range(new Text(tableId + ";"), new Text(tableId + "<")));
+ scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
+ return Iterators.size(scanner.iterator()) == 0;
+ }
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ String tableName = getUniqueNames(1)[0];
+
+ for (int tn = 0; tn < 2; tn++) {
+
+ Connector connector = getConnector();
+ // create a table and put some data in it
+ connector.tableOperations().create(tableName);
+ BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
+ bw.addMutation(m("a"));
+ bw.addMutation(m("b"));
+ bw.addMutation(m("c"));
+ bw.close();
+ // take the table offline
+ connector.tableOperations().offline(tableName);
+ while (!isOffline(tableName, connector))
+ UtilWaitThread.sleep(200);
+
+ // poke a partial split into the metadata table
+ connector.securityOperations().grantTablePermission(getAdminPrincipal(), MetadataTable.NAME, TablePermission.WRITE);
+ String tableId = connector.tableOperations().tableIdMap().get(tableName);
+
+ KeyExtent extent = new KeyExtent(new Text(tableId), null, new Text("b"));
+ Mutation m = extent.getPrevRowUpdateMutation();
+
+ TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value(Double.toString(0.5).getBytes()));
+ TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(null));
+ bw = connector.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ bw.addMutation(m);
+
+ if (tn == 1) {
+
+ bw.flush();
+
+ Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ scanner.setRange(extent.toMetadataRange());
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+
+ KeyExtent extent2 = new KeyExtent(new Text(tableId), new Text("b"), null);
+ m = extent2.getPrevRowUpdateMutation();
+ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value("/t2".getBytes()));
+ TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value("M0".getBytes()));
+
+ for (Entry<Key,Value> entry : scanner) {
+ m.put(DataFileColumnFamily.NAME, entry.getKey().getColumnQualifier(), entry.getValue());
+ }
+
+ bw.addMutation(m);
+ }
+
+ bw.close();
+ // bring the table online
+ connector.tableOperations().online(tableName);
+
+ // verify the tablets went online
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ int i = 0;
+ String expected[] = {"a", "b", "c"};
+ for (Entry<Key,Value> entry : scanner) {
+ assertEquals(expected[i], entry.getKey().getRow().toString());
+ i++;
+ }
+ assertEquals(3, i);
+
+ connector.tableOperations().delete(tableName);
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/TableConfigurationUpdateIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/TableConfigurationUpdateIT.java b/test/src/main/java/org/apache/accumulo/test/TableConfigurationUpdateIT.java
new file mode 100644
index 0000000..1dd964c
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/TableConfigurationUpdateIT.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.Namespaces;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.server.conf.NamespaceConfiguration;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TableConfigurationUpdateIT extends AccumuloClusterHarness {
+ private static final Logger log = LoggerFactory.getLogger(TableConfigurationUpdateIT.class);
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Test
+ public void test() throws Exception {
+ Connector conn = getConnector();
+ Instance inst = conn.getInstance();
+
+ String table = getUniqueNames(1)[0];
+ conn.tableOperations().create(table);
+
+ final NamespaceConfiguration defaultConf = new NamespaceConfiguration(Namespaces.DEFAULT_NAMESPACE_ID, inst,
+ AccumuloConfiguration.getDefaultConfiguration());
+
+ // Cache invalidates 25% of the time
+ int randomMax = 4;
+ // Number of threads
+ int numThreads = 2;
+ // Number of iterations per thread
+ int iterations = 100000;
+ AccumuloConfiguration tableConf = new TableConfiguration(inst, table, defaultConf);
+
+ long start = System.currentTimeMillis();
+ ExecutorService svc = Executors.newFixedThreadPool(numThreads);
+ CountDownLatch countDown = new CountDownLatch(numThreads);
+ ArrayList<Future<Exception>> futures = new ArrayList<Future<Exception>>(numThreads);
+
+ for (int i = 0; i < numThreads; i++) {
+ futures.add(svc.submit(new TableConfRunner(randomMax, iterations, tableConf, countDown)));
+ }
+
+ svc.shutdown();
+ Assert.assertTrue(svc.awaitTermination(60, TimeUnit.MINUTES));
+
+ for (Future<Exception> fut : futures) {
+ Exception e = fut.get();
+ if (null != e) {
+ Assert.fail("Thread failed with exception " + e);
+ }
+ }
+
+ long end = System.currentTimeMillis();
+ log.debug(tableConf + " with " + iterations + " iterations and " + numThreads + " threads and cache invalidates " + ((1. / randomMax) * 100.) + "% took "
+ + (end - start) / 1000 + " second(s)");
+ }
+
+ public static class TableConfRunner implements Callable<Exception> {
+ private static final Property prop = Property.TABLE_SPLIT_THRESHOLD;
+ private AccumuloConfiguration tableConf;
+ private CountDownLatch countDown;
+ private int iterations, randMax;
+
+ public TableConfRunner(int randMax, int iterations, AccumuloConfiguration tableConf, CountDownLatch countDown) {
+ this.randMax = randMax;
+ this.iterations = iterations;
+ this.tableConf = tableConf;
+ this.countDown = countDown;
+ }
+
+ @Override
+ public Exception call() {
+ Random r = new Random();
+ countDown.countDown();
+ try {
+ countDown.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return e;
+ }
+
+ String t = Thread.currentThread().getName() + " ";
+ try {
+ for (int i = 0; i < iterations; i++) {
+ // if (i % 10000 == 0) {
+ // log.info(t + " " + i);
+ // }
+ int choice = r.nextInt(randMax);
+ if (choice < 1) {
+ tableConf.invalidateCache();
+ } else {
+ tableConf.get(prop);
+ }
+ }
+ } catch (Exception e) {
+ log.error(t, e);
+ return e;
+ }
+
+ return null;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java b/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
new file mode 100644
index 0000000..789b089
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.DiskUsage;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.constraints.DefaultKeySizeConstraint;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.functional.BadIterator;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class TableOperationsIT extends AccumuloClusterHarness {
+
+ static TabletClientService.Client client;
+
+ private Connector connector;
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 30;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ connector = getConnector();
+ }
+
+ @Test
+ public void getDiskUsageErrors() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException, TException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+ List<DiskUsage> diskUsage = connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
+ assertEquals(1, diskUsage.size());
+ assertEquals(0, (long) diskUsage.get(0).getUsage());
+ assertEquals(tableName, diskUsage.get(0).getTables().iterator().next());
+
+ connector.securityOperations().revokeTablePermission(getAdminPrincipal(), tableName, TablePermission.READ);
+ try {
+ connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
+ fail("Should throw securityexception");
+ } catch (AccumuloSecurityException e) {}
+
+ connector.tableOperations().delete(tableName);
+ try {
+ connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
+ fail("Should throw tablenotfound");
+ } catch (TableNotFoundException e) {}
+ }
+
+ @Test
+ public void getDiskUsage() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException, TException {
+ final String[] names = getUniqueNames(2);
+ String tableName = names[0];
+ connector.tableOperations().create(tableName);
+
+ // verify 0 disk usage
+ List<DiskUsage> diskUsages = connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
+ assertEquals(1, diskUsages.size());
+ assertEquals(1, diskUsages.get(0).getTables().size());
+ assertEquals(Long.valueOf(0), diskUsages.get(0).getUsage());
+ assertEquals(tableName, diskUsages.get(0).getTables().first());
+
+ // add some data
+ BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("a");
+ m.put("b", "c", new Value("abcde".getBytes()));
+ bw.addMutation(m);
+ bw.flush();
+ bw.close();
+
+ connector.tableOperations().compact(tableName, new Text("A"), new Text("z"), true, true);
+
+ // verify we have usage
+ diskUsages = connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
+ assertEquals(1, diskUsages.size());
+ assertEquals(1, diskUsages.get(0).getTables().size());
+ assertTrue(diskUsages.get(0).getUsage() > 0);
+ assertEquals(tableName, diskUsages.get(0).getTables().first());
+
+ String newTable = names[1];
+
+ // clone table
+ connector.tableOperations().clone(tableName, newTable, false, null, null);
+
+ // verify tables are exactly the same
+ Set<String> tables = new HashSet<String>();
+ tables.add(tableName);
+ tables.add(newTable);
+ diskUsages = connector.tableOperations().getDiskUsage(tables);
+ assertEquals(1, diskUsages.size());
+ assertEquals(2, diskUsages.get(0).getTables().size());
+ assertTrue(diskUsages.get(0).getUsage() > 0);
+
+ connector.tableOperations().compact(tableName, new Text("A"), new Text("z"), true, true);
+ connector.tableOperations().compact(newTable, new Text("A"), new Text("z"), true, true);
+
+ // verify tables have differences
+ diskUsages = connector.tableOperations().getDiskUsage(tables);
+ assertEquals(2, diskUsages.size());
+ assertEquals(1, diskUsages.get(0).getTables().size());
+ assertEquals(1, diskUsages.get(1).getTables().size());
+ assertTrue(diskUsages.get(0).getUsage() > 0);
+ assertTrue(diskUsages.get(1).getUsage() > 0);
+
+ connector.tableOperations().delete(tableName);
+ }
+
+ @Test
+ public void createTable() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+ Iterable<Map.Entry<String,String>> itrProps = connector.tableOperations().getProperties(tableName);
+ Map<String,String> props = propsToMap(itrProps);
+ assertEquals(DefaultKeySizeConstraint.class.getName(), props.get(Property.TABLE_CONSTRAINT_PREFIX.toString() + "1"));
+ connector.tableOperations().delete(tableName);
+ }
+
+ @Test
+ public void createMergeClonedTable() throws Exception {
+ String[] names = getUniqueNames(2);
+ String originalTable = names[0];
+ TableOperations tops = connector.tableOperations();
+
+ TreeSet<Text> splits = Sets.newTreeSet(Arrays.asList(new Text("a"), new Text("b"), new Text("c"), new Text("d")));
+
+ tops.create(originalTable);
+ tops.addSplits(originalTable, splits);
+
+ BatchWriter bw = connector.createBatchWriter(originalTable, new BatchWriterConfig());
+ for (Text row : splits) {
+ Mutation m = new Mutation(row);
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ m.put(Integer.toString(i), Integer.toString(j), Integer.toString(i + j));
+ }
+ }
+
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ String clonedTable = names[1];
+
+ tops.clone(originalTable, clonedTable, true, null, null);
+ tops.merge(clonedTable, null, new Text("b"));
+
+ Map<String,Integer> rowCounts = Maps.newHashMap();
+ Scanner s = connector.createScanner(clonedTable, new Authorizations());
+ for (Entry<Key,Value> entry : s) {
+ final Key key = entry.getKey();
+ String row = key.getRow().toString();
+ String cf = key.getColumnFamily().toString(), cq = key.getColumnQualifier().toString();
+ String value = entry.getValue().toString();
+
+ if (rowCounts.containsKey(row)) {
+ rowCounts.put(row, rowCounts.get(row) + 1);
+ } else {
+ rowCounts.put(row, 1);
+ }
+
+ Assert.assertEquals(Integer.parseInt(cf) + Integer.parseInt(cq), Integer.parseInt(value));
+ }
+
+ Collection<Text> clonedSplits = tops.listSplits(clonedTable);
+ Set<Text> expectedSplits = Sets.newHashSet(new Text("b"), new Text("c"), new Text("d"));
+ for (Text clonedSplit : clonedSplits) {
+ Assert.assertTrue("Encountered unexpected split on the cloned table: " + clonedSplit, expectedSplits.remove(clonedSplit));
+ }
+
+ Assert.assertTrue("Did not find all expected splits on the cloned table: " + expectedSplits, expectedSplits.isEmpty());
+ }
+
+ private Map<String,String> propsToMap(Iterable<Map.Entry<String,String>> props) {
+ Map<String,String> map = new HashMap<String,String>();
+ for (Map.Entry<String,String> prop : props) {
+ map.put(prop.getKey(), prop.getValue());
+ }
+ return map;
+ }
+
+ @Test
+ public void testCompactEmptyTableWithGeneratorIterator() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+
+ List<IteratorSetting> list = new ArrayList<>();
+ list.add(new IteratorSetting(15, HardListIterator.class));
+ connector.tableOperations().compact(tableName, null, null, list, true, true);
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+ for (Map.Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey(), entry.getValue());
+ assertEquals(HardListIterator.allEntriesToInject, actual);
+ connector.tableOperations().delete(tableName);
+ }
+
+ /** Compare only the row, column family and column qualifier. */
+ static class KeyRowColFColQComparator implements Comparator<Key> {
+ @Override
+ public int compare(Key k1, Key k2) {
+ return k1.compareTo(k2, PartialKey.ROW_COLFAM_COLQUAL);
+ }
+ }
+
+ static final KeyRowColFColQComparator COMPARE_KEY_TO_COLQ = new KeyRowColFColQComparator();
+
+ @Test
+ public void testCompactEmptyTableWithGeneratorIterator_Splits() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+ SortedSet<Text> splitset = new TreeSet<>();
+ splitset.add(new Text("f"));
+ connector.tableOperations().addSplits(tableName, splitset);
+
+ List<IteratorSetting> list = new ArrayList<>();
+ list.add(new IteratorSetting(15, HardListIterator.class));
+ connector.tableOperations().compact(tableName, null, null, list, true, true);
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+ for (Map.Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey(), entry.getValue());
+ assertEquals(HardListIterator.allEntriesToInject, actual);
+ connector.tableOperations().delete(tableName);
+ }
+
+ @Test
+ public void testCompactEmptyTableWithGeneratorIterator_Splits_Cancel() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+ SortedSet<Text> splitset = new TreeSet<>();
+ splitset.add(new Text("f"));
+ connector.tableOperations().addSplits(tableName, splitset);
+
+ List<IteratorSetting> list = new ArrayList<>();
+ list.add(new IteratorSetting(15, HardListIterator.class));
+ connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block
+ connector.tableOperations().cancelCompaction(tableName);
+ // depending on timing, compaction will finish or be canceled
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+ for (Map.Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey(), entry.getValue());
+ switch (actual.size()) {
+ case 3:
+ // Compaction cancel didn't happen in time
+ assertTrue(HardListIterator.allEntriesToInject.equals(actual));
+ break;
+ case 2:
+ // Compacted the first tablet (-inf, f)
+ assertEquals(HardListIterator.allEntriesToInject.headMap(new Key("f")), actual);
+ break;
+ case 1:
+ // Compacted the second tablet [f, +inf)
+ assertEquals(HardListIterator.allEntriesToInject.tailMap(new Key("f")), actual);
+ break;
+ case 0:
+ // Cancelled the compaction before it ran. No generated entries.
+ break;
+ default:
+ Assert.fail("Unexpected number of entries");
+ break;
+ }
+ connector.tableOperations().delete(tableName);
+ }
+
+ @Test
+ public void testCompactEmptyTableWithGeneratorIterator_Splits_Partial() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+ Text splitRow = new Text("f");
+ SortedSet<Text> splitset = new TreeSet<>();
+ splitset.add(splitRow);
+ connector.tableOperations().addSplits(tableName, splitset);
+
+ List<IteratorSetting> list = new ArrayList<>();
+ list.add(new IteratorSetting(15, HardListIterator.class));
+ // compact the second tablet, not the first
+ connector.tableOperations().compact(tableName, splitRow, null, list, true, true);
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+ for (Map.Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey(), entry.getValue());
+ // only expect the entries in the second tablet
+ assertEquals(HardListIterator.allEntriesToInject.tailMap(new Key(splitRow)), actual);
+ connector.tableOperations().delete(tableName);
+ }
+
+ /** Test recovery from bad majc iterator via compaction cancel. */
+ @Test
+ public void testCompactEmptyTablesWithBadIterator_FailsAndCancel() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+
+ List<IteratorSetting> list = new ArrayList<>();
+ list.add(new IteratorSetting(15, BadIterator.class));
+ connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block
+ UtilWaitThread.sleep(2000); // start compaction
+ connector.tableOperations().cancelCompaction(tableName);
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ Map<Key,Value> actual = new TreeMap<>();
+ for (Map.Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey(), entry.getValue());
+ assertTrue("Should be empty. Actual is " + actual, actual.isEmpty());
+ connector.tableOperations().delete(tableName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java b/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
new file mode 100644
index 0000000..06bf394
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+// ACCUMULO-2480
+public class TabletServerGivesUpIT extends ConfigurableMacBase {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.useMiniDFS(true);
+ cfg.setNumTservers(1);
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+ }
+
+ @Test(timeout = 30 * 1000)
+ public void test() throws Exception {
+ final Connector conn = this.getConnector();
+ // Yes, there's a tabletserver
+ assertEquals(1, conn.instanceOperations().getTabletServers().size());
+ final String tableName = getUniqueNames(1)[0];
+ conn.tableOperations().create(tableName);
+ // Kill dfs
+ cluster.getMiniDfs().shutdown();
+ // ask the tserver to do something
+ final AtomicReference<Exception> ex = new AtomicReference<>();
+ Thread splitter = new Thread() {
+ @Override
+ public void run() {
+ try {
+ TreeSet<Text> splits = new TreeSet<>();
+ splits.add(new Text("X"));
+ conn.tableOperations().addSplits(tableName, splits);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ }
+ };
+ splitter.start();
+ // wait for the tserver to give up on writing to the WAL
+ while (conn.instanceOperations().getTabletServers().size() == 1) {
+ UtilWaitThread.sleep(1000);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java b/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java
new file mode 100644
index 0000000..bf2e7f1
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.common.net.HostAndPort;
+
+// see ACCUMULO-1950
+public class TotalQueuedIT extends ConfigurableMacBase {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
+ cfg.setDefaultMemory(cfg.getDefaultMemory() * 2, MemoryUnit.BYTE);
+ cfg.useMiniDFS();
+ }
+
+ int SMALL_QUEUE_SIZE = 100000;
+ int LARGE_QUEUE_SIZE = SMALL_QUEUE_SIZE * 10;
+ static final long N = 1000000;
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void test() throws Exception {
+ Random random = new Random();
+ Connector c = getConnector();
+ c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "" + SMALL_QUEUE_SIZE);
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "9999");
+ c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "999");
+ UtilWaitThread.sleep(1000);
+ // get an idea of how fast the syncs occur
+ byte row[] = new byte[250];
+ BatchWriterConfig cfg = new BatchWriterConfig();
+ cfg.setMaxWriteThreads(10);
+ cfg.setMaxLatency(1, TimeUnit.SECONDS);
+ cfg.setMaxMemory(1024 * 1024);
+ long realSyncs = getSyncs();
+ BatchWriter bw = c.createBatchWriter(tableName, cfg);
+ long now = System.currentTimeMillis();
+ long bytesSent = 0;
+ for (int i = 0; i < N; i++) {
+ random.nextBytes(row);
+ Mutation m = new Mutation(row);
+ m.put("", "", "");
+ bw.addMutation(m);
+ bytesSent += m.estimatedMemoryUsed();
+ }
+ bw.close();
+ long diff = System.currentTimeMillis() - now;
+ double secs = diff / 1000.;
+ double syncs = bytesSent / SMALL_QUEUE_SIZE;
+ double syncsPerSec = syncs / secs;
+ System.out.println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", bytesSent, secs, ((long) syncs), syncsPerSec));
+ long update = getSyncs();
+ System.out.println("Syncs " + (update - realSyncs));
+ realSyncs = update;
+
+ // Now with a much bigger total queue
+ c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "" + LARGE_QUEUE_SIZE);
+ c.tableOperations().flush(tableName, null, null, true);
+ UtilWaitThread.sleep(1000);
+ bw = c.createBatchWriter(tableName, cfg);
+ now = System.currentTimeMillis();
+ bytesSent = 0;
+ for (int i = 0; i < N; i++) {
+ random.nextBytes(row);
+ Mutation m = new Mutation(row);
+ m.put("", "", "");
+ bw.addMutation(m);
+ bytesSent += m.estimatedMemoryUsed();
+ }
+ bw.close();
+ diff = System.currentTimeMillis() - now;
+ secs = diff / 1000.;
+ syncs = bytesSent / LARGE_QUEUE_SIZE;
+ syncsPerSec = syncs / secs;
+ System.out.println(String.format("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)", bytesSent, secs, ((long) syncs), syncsPerSec));
+ update = getSyncs();
+ System.out.println("Syncs " + (update - realSyncs));
+ assertTrue(update - realSyncs < realSyncs);
+ }
+
+ private long getSyncs() throws Exception {
+ Connector c = getConnector();
+ ServerConfigurationFactory confFactory = new ServerConfigurationFactory(c.getInstance());
+ AccumuloServerContext context = new AccumuloServerContext(confFactory);
+ for (String address : c.instanceOperations().getTabletServers()) {
+ TabletClientService.Client client = ThriftUtil.getTServerClient(HostAndPort.fromString(address), context);
+ TabletServerStatus status = client.getTabletServerStatus(null, context.rpcCreds());
+ return status.syncs;
+ }
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java
new file mode 100644
index 0000000..1c6e3df
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/TracerRecoversAfterOfflineTableIT.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.trace.DistributedTrace;
+import org.apache.accumulo.core.trace.Span;
+import org.apache.accumulo.core.trace.Trace;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.accumulo.tracer.TraceDump;
+import org.apache.accumulo.tracer.TraceDump.Printer;
+import org.apache.accumulo.tracer.TraceServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TracerRecoversAfterOfflineTableIT extends ConfigurableMacBase {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+ cfg.setNumTservers(1);
+ }
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Test
+ public void test() throws Exception {
+ Process tracer = null;
+ Connector conn = getConnector();
+ if (!conn.tableOperations().exists("trace")) {
+ MiniAccumuloClusterImpl mac = cluster;
+ tracer = mac.exec(TraceServer.class);
+ while (!conn.tableOperations().exists("trace")) {
+ UtilWaitThread.sleep(1000);
+ }
+ UtilWaitThread.sleep(5000);
+ }
+
+ log.info("Taking table offline");
+ conn.tableOperations().offline("trace", true);
+
+ String tableName = getUniqueNames(1)[0];
+ conn.tableOperations().create(tableName);
+
+ log.info("Start a distributed trace span");
+
+ DistributedTrace.enable("localhost", "testTrace", getClientConfig());
+ Span root = Trace.on("traceTest");
+ BatchWriter bw = conn.createBatchWriter(tableName, null);
+ Mutation m = new Mutation("m");
+ m.put("a", "b", "c");
+ bw.addMutation(m);
+ bw.close();
+ root.stop();
+
+ log.info("Bringing trace table back online");
+ conn.tableOperations().online("trace", true);
+
+ log.info("Trace table is online, should be able to find trace");
+
+ final Scanner scanner = conn.createScanner("trace", Authorizations.EMPTY);
+ scanner.setRange(new Range(new Text(Long.toHexString(root.traceId()))));
+ while (true) {
+ final StringBuffer finalBuffer = new StringBuffer();
+ int traceCount = TraceDump.printTrace(scanner, new Printer() {
+ @Override
+ public void print(final String line) {
+ try {
+ finalBuffer.append(line).append("\n");
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ });
+ String traceOutput = finalBuffer.toString();
+ log.info("Trace output:" + traceOutput);
+ if (traceCount > 0) {
+ int lastPos = 0;
+ for (String part : "traceTest,close,binMutations".split(",")) {
+ log.info("Looking in trace output for '" + part + "'");
+ int pos = traceOutput.indexOf(part);
+ assertTrue("Did not find '" + part + "' in output", pos > 0);
+ assertTrue("'" + part + "' occurred earlier than the previous element unexpectedly", pos > lastPos);
+ lastPos = pos;
+ }
+ break;
+ } else {
+ log.info("Ignoring trace output as traceCount not greater than zero: " + traceCount);
+ Thread.sleep(1000);
+ }
+ }
+ if (tracer != null) {
+ tracer.destroy();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
new file mode 100644
index 0000000..9cc3dc0
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.ThriftTransportKey;
+import org.apache.accumulo.core.client.impl.ThriftTransportPool;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that {@link ThriftTransportPool} actually adheres to the cachedConnection argument
+ */
+public class TransportCachingIT extends AccumuloClusterHarness {
+ private static final Logger log = LoggerFactory.getLogger(TransportCachingIT.class);
+
+ @Test
+ public void testCachedTransport() {
+ Connector conn = getConnector();
+ Instance instance = conn.getInstance();
+ ClientConfiguration clientConf = cluster.getClientConfig();
+ ClientContext context = new ClientContext(instance, new Credentials(getAdminPrincipal(), getAdminToken()), clientConf);
+ long rpcTimeout = DefaultConfiguration.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT.getDefaultValue());
+
+ // create list of servers
+ ArrayList<ThriftTransportKey> servers = new ArrayList<ThriftTransportKey>();
+
+ // add tservers
+ ZooCache zc = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
+ for (String tserver : zc.getChildren(ZooUtil.getRoot(instance) + Constants.ZTSERVERS)) {
+ String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + tserver;
+ byte[] data = ZooUtil.getLockData(zc, path);
+ if (data != null) {
+ String strData = new String(data, UTF_8);
+ if (!strData.equals("master"))
+ servers.add(new ThriftTransportKey(new ServerServices(strData).getAddress(Service.TSERV_CLIENT), rpcTimeout, context));
+ }
+ }
+
+ ThriftTransportPool pool = ThriftTransportPool.getInstance();
+ TTransport first = null;
+ while (null == first) {
+ try {
+ // Get a transport (cached or not)
+ first = pool.getAnyTransport(servers, true).getSecond();
+ } catch (TTransportException e) {
+ log.warn("Failed to obtain transport to " + servers);
+ }
+ }
+
+ assertNotNull(first);
+ // Return it to unreserve it
+ pool.returnTransport(first);
+
+ TTransport second = null;
+ while (null == second) {
+ try {
+ // Get a cached transport (should be the first)
+ second = pool.getAnyTransport(servers, true).getSecond();
+ } catch (TTransportException e) {
+ log.warn("Failed obtain 2nd transport to " + servers);
+ }
+ }
+
+ // We should get the same transport
+ assertTrue("Expected the first and second to be the same instance", first == second);
+ // Return the 2nd
+ pool.returnTransport(second);
+
+ TTransport third = null;
+ while (null == third) {
+ try {
+ // Get a non-cached transport
+ third = pool.getAnyTransport(servers, false).getSecond();
+ } catch (TTransportException e) {
+ log.warn("Failed obtain 2nd transport to " + servers);
+ }
+ }
+
+ assertFalse("Expected second and third transport to be different instances", second == third);
+ pool.returnTransport(third);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java b/test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java
new file mode 100644
index 0000000..281c358
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+// When reviewing the changes for ACCUMULO-3423, kturner suggested
+// "tablets will now have log references that contain no data,
+// so it may be marked with 3 WALs, the first with data, the 2nd without, a 3rd with data.
+// It would be useful to have an IT that will test this situation.
+public class UnusedWALIT extends ConfigurableMacBase {
+
+ private ZooReaderWriter zk;
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ final long logSize = 1024 * 1024 * 10;
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+ cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, Long.toString(logSize));
+ cfg.setNumTservers(1);
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ hadoopCoreSite.set("fs.namenode.fs-limits.min-block-size", Long.toString(logSize));
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void test() throws Exception {
+ // don't want this bad boy cleaning up walog entries
+ getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
+
+ // make two tables
+ String[] tableNames = getUniqueNames(2);
+ String bigTable = tableNames[0];
+ String lilTable = tableNames[1];
+ Connector c = getConnector();
+ c.tableOperations().create(bigTable);
+ c.tableOperations().create(lilTable);
+
+ Instance i = c.getInstance();
+ zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(), "");
+
+ // put some data in a log that should be replayed for both tables
+ writeSomeData(c, bigTable, 0, 10, 0, 10);
+ scanSomeData(c, bigTable, 0, 10, 0, 10);
+ writeSomeData(c, lilTable, 0, 1, 0, 1);
+ scanSomeData(c, lilTable, 0, 1, 0, 1);
+ assertEquals(2, getWALCount(i, zk));
+
+ // roll the logs by pushing data into bigTable
+ writeSomeData(c, bigTable, 0, 3000, 0, 1000);
+ assertEquals(3, getWALCount(i, zk));
+
+ // put some data in the latest log
+ writeSomeData(c, lilTable, 1, 10, 0, 10);
+ scanSomeData(c, lilTable, 1, 10, 0, 10);
+
+ // bounce the tserver
+ getCluster().getClusterControl().stop(ServerType.TABLET_SERVER);
+ getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
+
+ // wait for the metadata table to be online
+ Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+
+ // check our two sets of data in different logs
+ scanSomeData(c, lilTable, 0, 1, 0, 1);
+ scanSomeData(c, lilTable, 1, 10, 0, 10);
+ }
+
+ private void scanSomeData(Connector c, String table, int startRow, int rowCount, int startCol, int colCount) throws Exception {
+ Scanner s = c.createScanner(table, Authorizations.EMPTY);
+ s.setRange(new Range(Integer.toHexString(startRow), Integer.toHexString(startRow + rowCount)));
+ int row = startRow;
+ int col = startCol;
+ for (Entry<Key,Value> entry : s) {
+ assertEquals(row, Integer.parseInt(entry.getKey().getRow().toString(), 16));
+ assertEquals(col++, Integer.parseInt(entry.getKey().getColumnQualifier().toString(), 16));
+ if (col == startCol + colCount) {
+ col = startCol;
+ row++;
+ if (row == startRow + rowCount) {
+ break;
+ }
+ }
+ }
+ assertEquals(row, startRow + rowCount);
+ }
+
+ private int getWALCount(Instance i, ZooReaderWriter zk) throws Exception {
+ WalStateManager wals = new WalStateManager(i, zk);
+ int result = 0;
+ for (Entry<TServerInstance,List<UUID>> entry : wals.getAllMarkers().entrySet()) {
+ result += entry.getValue().size();
+ }
+ return result;
+ }
+
+ private void writeSomeData(Connector conn, String table, int startRow, int rowCount, int startCol, int colCount) throws Exception {
+ BatchWriterConfig config = new BatchWriterConfig();
+ config.setMaxMemory(10 * 1024 * 1024);
+ BatchWriter bw = conn.createBatchWriter(table, config);
+ for (int r = startRow; r < startRow + rowCount; r++) {
+ Mutation m = new Mutation(Integer.toHexString(r));
+ for (int c = startCol; c < startCol + colCount; c++) {
+ m.put("", Integer.toHexString(c), "");
+ }
+ bw.addMutation(m);
+ }
+ bw.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java b/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
new file mode 100644
index 0000000..fa9e642
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.functional.FunctionalTestUtils;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class UserCompactionStrategyIT extends AccumuloClusterHarness {
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 3 * 60;
+ }
+
+ @Test
+ public void testDropA() throws Exception {
+ Connector c = getConnector();
+
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+
+ writeFlush(c, tableName, "a");
+ writeFlush(c, tableName, "b");
+ // create a file that starts with A containing rows 'a' and 'b'
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+ writeFlush(c, tableName, "c");
+ writeFlush(c, tableName, "d");
+
+ // drop files that start with A
+ CompactionStrategyConfig csConfig = new CompactionStrategyConfig(TestCompactionStrategy.class.getName());
+ csConfig.setOptions(ImmutableMap.of("dropPrefix", "A", "inputPrefix", "F"));
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig));
+
+ Assert.assertEquals(ImmutableSet.of("c", "d"), getRows(c, tableName));
+
+ // this compaction should not drop files starting with A
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+ Assert.assertEquals(ImmutableSet.of("c", "d"), getRows(c, tableName));
+ }
+
+ private void testDropNone(Map<String,String> options) throws Exception {
+
+ Connector c = getConnector();
+
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+
+ writeFlush(c, tableName, "a");
+ writeFlush(c, tableName, "b");
+
+ CompactionStrategyConfig csConfig = new CompactionStrategyConfig(TestCompactionStrategy.class.getName());
+ csConfig.setOptions(options);
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig));
+
+ Assert.assertEquals(ImmutableSet.of("a", "b"), getRows(c, tableName));
+ }
+
+ @Test
+ public void testDropNone() throws Exception {
+ // test a compaction strategy that selects no files. In this case there is no work to do, want to ensure it does not hang.
+
+ testDropNone(ImmutableMap.of("inputPrefix", "Z"));
+ }
+
+ @Test
+ public void testDropNone2() throws Exception {
+ // test a compaction strategy that selects no files. This differs testDropNone() in that shouldCompact() will return true and getCompactionPlan() will
+ // return no work to do.
+
+ testDropNone(ImmutableMap.of("inputPrefix", "Z", "shouldCompact", "true"));
+ }
+
+ @Test
+ public void testPerTableClasspath() throws Exception {
+ // Can't assume that a test-resource will be on the server's classpath
+ Assume.assumeTrue(ClusterType.MINI == getClusterType());
+
+ // test pertable classpath + user specified compaction strat
+
+ final Connector c = getConnector();
+ final String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ c.instanceOperations().setProperty(Property.VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "context1",
+ System.getProperty("user.dir") + "/src/test/resources/TestCompactionStrat.jar");
+ c.tableOperations().setProperty(tableName, Property.TABLE_CLASSPATH.getKey(), "context1");
+
+ c.tableOperations().addSplits(tableName, new TreeSet<Text>(Arrays.asList(new Text("efg"))));
+
+ writeFlush(c, tableName, "a");
+ writeFlush(c, tableName, "b");
+
+ writeFlush(c, tableName, "h");
+ writeFlush(c, tableName, "i");
+
+ Assert.assertEquals(4, FunctionalTestUtils.countRFiles(c, tableName));
+
+ // EfgCompactionStrat will only compact a tablet w/ end row of 'efg'. No other tablets are compacted.
+ CompactionStrategyConfig csConfig = new CompactionStrategyConfig("org.apache.accumulo.test.EfgCompactionStrat");
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig));
+
+ Assert.assertEquals(3, FunctionalTestUtils.countRFiles(c, tableName));
+
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+ Assert.assertEquals(2, FunctionalTestUtils.countRFiles(c, tableName));
+ }
+
+ @Test
+ public void testIterators() throws Exception {
+ // test compaction strategy + iterators
+
+ Connector c = getConnector();
+
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+
+ writeFlush(c, tableName, "a");
+ writeFlush(c, tableName, "b");
+ // create a file that starts with A containing rows 'a' and 'b'
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+ writeFlush(c, tableName, "c");
+ writeFlush(c, tableName, "d");
+
+ Assert.assertEquals(3, FunctionalTestUtils.countRFiles(c, tableName));
+
+ // drop files that start with A
+ CompactionStrategyConfig csConfig = new CompactionStrategyConfig(TestCompactionStrategy.class.getName());
+ csConfig.setOptions(ImmutableMap.of("inputPrefix", "F"));
+
+ IteratorSetting iterConf = new IteratorSetting(21, "myregex", RegExFilter.class);
+ RegExFilter.setRegexs(iterConf, "a|c", null, null, null, false);
+
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig).setIterators(Arrays.asList(iterConf)));
+
+ // compaction strategy should only be applied to one file. If its applied to both, then row 'b' would be dropped by filter.
+ Assert.assertEquals(ImmutableSet.of("a", "b", "c"), getRows(c, tableName));
+
+ Assert.assertEquals(2, FunctionalTestUtils.countRFiles(c, tableName));
+
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+ // ensure that iterator is not applied
+ Assert.assertEquals(ImmutableSet.of("a", "b", "c"), getRows(c, tableName));
+
+ Assert.assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName));
+ }
+
+ @Test
+ public void testFileSize() throws Exception {
+ Connector c = getConnector();
+
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+
+ // write random data because its very unlikely it will compress
+ writeRandomValue(c, tableName, 1 << 16);
+ writeRandomValue(c, tableName, 1 << 16);
+
+ writeRandomValue(c, tableName, 1 << 9);
+ writeRandomValue(c, tableName, 1 << 7);
+ writeRandomValue(c, tableName, 1 << 6);
+
+ Assert.assertEquals(5, FunctionalTestUtils.countRFiles(c, tableName));
+
+ CompactionStrategyConfig csConfig = new CompactionStrategyConfig(SizeCompactionStrategy.class.getName());
+ csConfig.setOptions(ImmutableMap.of("size", "" + (1 << 15)));
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig));
+
+ Assert.assertEquals(3, FunctionalTestUtils.countRFiles(c, tableName));
+
+ csConfig = new CompactionStrategyConfig(SizeCompactionStrategy.class.getName());
+ csConfig.setOptions(ImmutableMap.of("size", "" + (1 << 17)));
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig));
+
+ Assert.assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName));
+
+ }
+
+ @Test
+ public void testConcurrent() throws Exception {
+ // two compactions without iterators or strategy should be able to run concurrently
+
+ Connector c = getConnector();
+
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+
+ // write random data because its very unlikely it will compress
+ writeRandomValue(c, tableName, 1 << 16);
+ writeRandomValue(c, tableName, 1 << 16);
+
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(false));
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+ Assert.assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName));
+
+ writeRandomValue(c, tableName, 1 << 16);
+
+ IteratorSetting iterConfig = new IteratorSetting(30, SlowIterator.class);
+ SlowIterator.setSleepTime(iterConfig, 1000);
+
+ long t1 = System.currentTimeMillis();
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(false).setIterators(Arrays.asList(iterConfig)));
+ try {
+ // this compaction should fail because previous one set iterators
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+ if (System.currentTimeMillis() - t1 < 2000)
+ Assert.fail("Expected compaction to fail because another concurrent compaction set iterators");
+ } catch (AccumuloException e) {}
+ }
+
+ void writeRandomValue(Connector c, String tableName, int size) throws Exception {
+ Random rand = new Random();
+
+ byte data1[] = new byte[size];
+ rand.nextBytes(data1);
+
+ BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+
+ Mutation m1 = new Mutation("r" + rand.nextInt(909090));
+ m1.put("data", "bl0b", new Value(data1));
+
+ bw.addMutation(m1);
+ bw.close();
+ c.tableOperations().flush(tableName, null, null, true);
+ }
+
+ private Set<String> getRows(Connector c, String tableName) throws TableNotFoundException {
+ Set<String> rows = new HashSet<String>();
+ Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
+
+ for (Entry<Key,Value> entry : scanner)
+ rows.add(entry.getKey().getRowData().toString());
+ return rows;
+
+ }
+
+ private void writeFlush(Connector conn, String tablename, String row) throws Exception {
+ BatchWriter bw = conn.createBatchWriter(tablename, new BatchWriterConfig());
+ Mutation m = new Mutation(row);
+ m.put("", "", "");
+ bw.addMutation(m);
+ bw.close();
+ conn.tableOperations().flush(tablename, null, null, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/UsersIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/UsersIT.java b/test/src/main/java/org/apache/accumulo/test/UsersIT.java
new file mode 100644
index 0000000..131f042
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/UsersIT.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Set;
+
+import org.apache.accumulo.cluster.ClusterUser;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.SecurityErrorCode;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.junit.Test;
+
+public class UsersIT extends AccumuloClusterHarness {
+
+ @Test
+ public void testCreateExistingUser() throws Exception {
+ ClusterUser user0 = getUser(0);
+ Connector conn = getConnector();
+ Set<String> currentUsers = conn.securityOperations().listLocalUsers();
+
+ // Ensure that the user exists
+ if (!currentUsers.contains(user0.getPrincipal())) {
+ PasswordToken token = null;
+ if (!getCluster().getClientConfig().getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ token = new PasswordToken(user0.getPassword());
+ }
+ conn.securityOperations().createLocalUser(user0.getPrincipal(), token);
+ }
+
+ try {
+ conn.securityOperations().createLocalUser(user0.getPrincipal(), new PasswordToken("better_fail"));
+ fail("Creating a user that already exists should throw an exception");
+ } catch (AccumuloSecurityException e) {
+ assertTrue("Expected USER_EXISTS error", SecurityErrorCode.USER_EXISTS == e.getSecurityErrorCode());
+ String msg = e.getMessage();
+ assertTrue("Error message didn't contain principal: '" + msg + "'", msg.contains(user0.getPrincipal()));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java
new file mode 100644
index 0000000..6a90730
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.server.util.Admin;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.accumulo.test.functional.FunctionalTestUtils;
+import org.apache.accumulo.tserver.TabletServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class VerifySerialRecoveryIT extends ConfigurableMacBase {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
+ cfg.setProperty(Property.TSERV_ASSIGNMENT_MAXCONCURRENT, "20");
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void testSerializedRecovery() throws Exception {
+ // make a table with many splits
+ String tableName = getUniqueNames(1)[0];
+ Connector c = getConnector();
+ c.tableOperations().create(tableName);
+ SortedSet<Text> splits = new TreeSet<Text>();
+ for (int i = 0; i < 200; i++) {
+ splits.add(new Text(AssignmentThreadsIT.randomHex(8)));
+ }
+ c.tableOperations().addSplits(tableName, splits);
+ // load data to give the recovery something to do
+ BatchWriter bw = c.createBatchWriter(tableName, null);
+ for (int i = 0; i < 50000; i++) {
+ Mutation m = new Mutation(AssignmentThreadsIT.randomHex(8));
+ m.put("", "", "");
+ bw.addMutation(m);
+ }
+ bw.close();
+ // kill the tserver
+ for (ProcessReference ref : getCluster().getProcesses().get(ServerType.TABLET_SERVER))
+ getCluster().killProcess(ServerType.TABLET_SERVER, ref);
+ final Process ts = cluster.exec(TabletServer.class);
+
+ // wait for recovery
+ Iterators.size(c.createScanner(tableName, Authorizations.EMPTY).iterator());
+ assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ ts.waitFor();
+ String result = FunctionalTestUtils.readAll(cluster, TabletServer.class, ts);
+ for (String line : result.split("\n")) {
+ System.out.println(line);
+ }
+ // walk through the output, verifying that only a single normal recovery was running at one time
+ boolean started = false;
+ int recoveries = 0;
+ for (String line : result.split("\n")) {
+ // ignore metadata tables
+ if (line.contains("!0") || line.contains("+r"))
+ continue;
+ if (line.contains("Starting Write-Ahead Log")) {
+ assertFalse(started);
+ started = true;
+ recoveries++;
+ }
+ if (line.contains("Write-Ahead Log recovery complete")) {
+ assertTrue(started);
+ started = false;
+ }
+ }
+ assertFalse(started);
+ assertTrue(recoveries > 0);
+ }
+}