You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/01/30 20:25:19 UTC
[3/3] accumulo git commit: Merge branch '1.7' into 1.8
Merge branch '1.7' into 1.8
Conflicts:
test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1f31ca6c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1f31ca6c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1f31ca6c
Branch: refs/heads/1.8
Commit: 1f31ca6c24d33458c7582f9cf0257e9f58508007
Parents: bf5b6e0 6d8a5fa
Author: Keith Turner <kt...@apache.org>
Authored: Mon Jan 30 15:09:36 2017 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Mon Jan 30 15:09:36 2017 -0500
----------------------------------------------------------------------
.../org/apache/accumulo/fate/AdminUtil.java | 72 ++++++++++++++++----
.../master/tableOps/CancelCompactions.java | 2 +-
.../master/tableOps/FinishCancelCompaction.java | 12 +++-
.../apache/accumulo/test/TableOperationsIT.java | 7 ++
.../accumulo/test/UserCompactionStrategyIT.java | 6 ++
.../functional/ConcurrentDeleteTableIT.java | 34 +--------
.../test/functional/FateStarvationIT.java | 2 +
.../test/functional/FunctionalTestUtils.java | 30 ++++++++
.../accumulo/test/functional/RenameIT.java | 2 +
9 files changed, 118 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1f31ca6c/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
index d1a52fb,0000000..a83b0e2
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TableOperationsIT.java
@@@ -1,375 -1,0 +1,382 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.DiskUsage;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.constraints.DefaultKeySizeConstraint;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.functional.BadIterator;
++import org.apache.accumulo.test.functional.FunctionalTestUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
++import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+public class TableOperationsIT extends AccumuloClusterHarness {
+
+ static TabletClientService.Client client;
+
+ private Connector connector;
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 30;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ connector = getConnector();
+ }
+
++ @After
++ public void checkForDanglingFateLocks() {
++ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster());
++ }
++
+ @Test
+ public void getDiskUsageErrors() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException, TException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+ List<DiskUsage> diskUsage = connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
+ assertEquals(1, diskUsage.size());
+ assertEquals(0, (long) diskUsage.get(0).getUsage());
+ assertEquals(tableName, diskUsage.get(0).getTables().iterator().next());
+
+ connector.securityOperations().revokeTablePermission(getAdminPrincipal(), tableName, TablePermission.READ);
+ try {
+ connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
+ fail("Should throw securityexception");
+ } catch (AccumuloSecurityException e) {}
+
+ connector.tableOperations().delete(tableName);
+ try {
+ connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
+ fail("Should throw tablenotfound");
+ } catch (TableNotFoundException e) {}
+ }
+
+ @Test
+ public void getDiskUsage() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException, TException {
+ final String[] names = getUniqueNames(2);
+ String tableName = names[0];
+ connector.tableOperations().create(tableName);
+
+ // verify 0 disk usage
+ List<DiskUsage> diskUsages = connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
+ assertEquals(1, diskUsages.size());
+ assertEquals(1, diskUsages.get(0).getTables().size());
+ assertEquals(Long.valueOf(0), diskUsages.get(0).getUsage());
+ assertEquals(tableName, diskUsages.get(0).getTables().first());
+
+ // add some data
+ BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("a");
+ m.put("b", "c", new Value("abcde".getBytes()));
+ bw.addMutation(m);
+ bw.flush();
+ bw.close();
+
+ connector.tableOperations().compact(tableName, new Text("A"), new Text("z"), true, true);
+
+ // verify we have usage
+ diskUsages = connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
+ assertEquals(1, diskUsages.size());
+ assertEquals(1, diskUsages.get(0).getTables().size());
+ assertTrue(diskUsages.get(0).getUsage() > 0);
+ assertEquals(tableName, diskUsages.get(0).getTables().first());
+
+ String newTable = names[1];
+
+ // clone table
+ connector.tableOperations().clone(tableName, newTable, false, null, null);
+
+ // verify tables are exactly the same
+ Set<String> tables = new HashSet<>();
+ tables.add(tableName);
+ tables.add(newTable);
+ diskUsages = connector.tableOperations().getDiskUsage(tables);
+ assertEquals(1, diskUsages.size());
+ assertEquals(2, diskUsages.get(0).getTables().size());
+ assertTrue(diskUsages.get(0).getUsage() > 0);
+
+ connector.tableOperations().compact(tableName, new Text("A"), new Text("z"), true, true);
+ connector.tableOperations().compact(newTable, new Text("A"), new Text("z"), true, true);
+
+ // verify tables have differences
+ diskUsages = connector.tableOperations().getDiskUsage(tables);
+ assertEquals(2, diskUsages.size());
+ assertEquals(1, diskUsages.get(0).getTables().size());
+ assertEquals(1, diskUsages.get(1).getTables().size());
+ assertTrue(diskUsages.get(0).getUsage() > 0);
+ assertTrue(diskUsages.get(1).getUsage() > 0);
+
+ connector.tableOperations().delete(tableName);
+ }
+
+ @Test
+ public void createTable() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+ Iterable<Map.Entry<String,String>> itrProps = connector.tableOperations().getProperties(tableName);
+ Map<String,String> props = propsToMap(itrProps);
+ assertEquals(DefaultKeySizeConstraint.class.getName(), props.get(Property.TABLE_CONSTRAINT_PREFIX.toString() + "1"));
+ connector.tableOperations().delete(tableName);
+ }
+
+ @Test
+ public void createMergeClonedTable() throws Exception {
+ String[] names = getUniqueNames(2);
+ String originalTable = names[0];
+ TableOperations tops = connector.tableOperations();
+
+ TreeSet<Text> splits = Sets.newTreeSet(Arrays.asList(new Text("a"), new Text("b"), new Text("c"), new Text("d")));
+
+ tops.create(originalTable);
+ tops.addSplits(originalTable, splits);
+
+ BatchWriter bw = connector.createBatchWriter(originalTable, new BatchWriterConfig());
+ for (Text row : splits) {
+ Mutation m = new Mutation(row);
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ m.put(Integer.toString(i), Integer.toString(j), Integer.toString(i + j));
+ }
+ }
+
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ String clonedTable = names[1];
+
+ tops.clone(originalTable, clonedTable, true, null, null);
+ tops.merge(clonedTable, null, new Text("b"));
+
+ Map<String,Integer> rowCounts = new HashMap<>();
+ Scanner s = connector.createScanner(clonedTable, new Authorizations());
+ for (Entry<Key,Value> entry : s) {
+ final Key key = entry.getKey();
+ String row = key.getRow().toString();
+ String cf = key.getColumnFamily().toString(), cq = key.getColumnQualifier().toString();
+ String value = entry.getValue().toString();
+
+ if (rowCounts.containsKey(row)) {
+ rowCounts.put(row, rowCounts.get(row) + 1);
+ } else {
+ rowCounts.put(row, 1);
+ }
+
+ Assert.assertEquals(Integer.parseInt(cf) + Integer.parseInt(cq), Integer.parseInt(value));
+ }
+
+ Collection<Text> clonedSplits = tops.listSplits(clonedTable);
+ Set<Text> expectedSplits = Sets.newHashSet(new Text("b"), new Text("c"), new Text("d"));
+ for (Text clonedSplit : clonedSplits) {
+ Assert.assertTrue("Encountered unexpected split on the cloned table: " + clonedSplit, expectedSplits.remove(clonedSplit));
+ }
+
+ Assert.assertTrue("Did not find all expected splits on the cloned table: " + expectedSplits, expectedSplits.isEmpty());
+ }
+
+ private Map<String,String> propsToMap(Iterable<Map.Entry<String,String>> props) {
+ Map<String,String> map = new HashMap<>();
+ for (Map.Entry<String,String> prop : props) {
+ map.put(prop.getKey(), prop.getValue());
+ }
+ return map;
+ }
+
+ @Test
+ public void testCompactEmptyTableWithGeneratorIterator() throws TableExistsException, AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+
+ List<IteratorSetting> list = new ArrayList<>();
+ list.add(new IteratorSetting(15, HardListIterator.class));
+ connector.tableOperations().compact(tableName, null, null, list, true, true);
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+ for (Map.Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey(), entry.getValue());
+ assertEquals(HardListIterator.allEntriesToInject, actual);
+ connector.tableOperations().delete(tableName);
+ }
+
+ /** Compare only the row, column family and column qualifier. */
+ static class KeyRowColFColQComparator implements Comparator<Key> {
+ @Override
+ public int compare(Key k1, Key k2) {
+ return k1.compareTo(k2, PartialKey.ROW_COLFAM_COLQUAL);
+ }
+ }
+
+ static final KeyRowColFColQComparator COMPARE_KEY_TO_COLQ = new KeyRowColFColQComparator();
+
+ @Test
+ public void testCompactEmptyTableWithGeneratorIterator_Splits() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+ SortedSet<Text> splitset = new TreeSet<>();
+ splitset.add(new Text("f"));
+ connector.tableOperations().addSplits(tableName, splitset);
+
+ List<IteratorSetting> list = new ArrayList<>();
+ list.add(new IteratorSetting(15, HardListIterator.class));
+ connector.tableOperations().compact(tableName, null, null, list, true, true);
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+ for (Map.Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey(), entry.getValue());
+ assertEquals(HardListIterator.allEntriesToInject, actual);
+ connector.tableOperations().delete(tableName);
+ }
+
+ @Test
+ public void testCompactEmptyTableWithGeneratorIterator_Splits_Cancel() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+ SortedSet<Text> splitset = new TreeSet<>();
+ splitset.add(new Text("f"));
+ connector.tableOperations().addSplits(tableName, splitset);
+
+ List<IteratorSetting> list = new ArrayList<>();
+ list.add(new IteratorSetting(15, HardListIterator.class));
+ connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block
+ connector.tableOperations().cancelCompaction(tableName);
+ // depending on timing, compaction will finish or be canceled
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+ for (Map.Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey(), entry.getValue());
+ switch (actual.size()) {
+ case 3:
+ // Compaction cancel didn't happen in time
+ assertTrue(HardListIterator.allEntriesToInject.equals(actual));
+ break;
+ case 2:
+ // Compacted the first tablet (-inf, f)
+ assertEquals(HardListIterator.allEntriesToInject.headMap(new Key("f")), actual);
+ break;
+ case 1:
+ // Compacted the second tablet [f, +inf)
+ assertEquals(HardListIterator.allEntriesToInject.tailMap(new Key("f")), actual);
+ break;
+ case 0:
+ // Cancelled the compaction before it ran. No generated entries.
+ break;
+ default:
+ Assert.fail("Unexpected number of entries");
+ break;
+ }
+ connector.tableOperations().delete(tableName);
+ }
+
+ @Test
+ public void testCompactEmptyTableWithGeneratorIterator_Splits_Partial() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+ Text splitRow = new Text("f");
+ SortedSet<Text> splitset = new TreeSet<>();
+ splitset.add(splitRow);
+ connector.tableOperations().addSplits(tableName, splitset);
+
+ List<IteratorSetting> list = new ArrayList<>();
+ list.add(new IteratorSetting(15, HardListIterator.class));
+ // compact the second tablet, not the first
+ connector.tableOperations().compact(tableName, splitRow, null, list, true, true);
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ Map<Key,Value> actual = new TreeMap<>(COMPARE_KEY_TO_COLQ); // only compare row, colF, colQ
+ for (Map.Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey(), entry.getValue());
+ // only expect the entries in the second tablet
+ assertEquals(HardListIterator.allEntriesToInject.tailMap(new Key(splitRow)), actual);
+ connector.tableOperations().delete(tableName);
+ }
+
+ /** Test recovery from bad majc iterator via compaction cancel. */
+ @Test
+ public void testCompactEmptyTablesWithBadIterator_FailsAndCancel() throws TableExistsException, AccumuloException, AccumuloSecurityException,
+ TableNotFoundException {
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+
+ List<IteratorSetting> list = new ArrayList<>();
+ list.add(new IteratorSetting(15, BadIterator.class));
+ connector.tableOperations().compact(tableName, null, null, list, true, false); // don't block
+ sleepUninterruptibly(2, TimeUnit.SECONDS); // start compaction
+ connector.tableOperations().cancelCompaction(tableName);
+
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ Map<Key,Value> actual = new TreeMap<>();
+ for (Map.Entry<Key,Value> entry : scanner)
+ actual.put(entry.getKey(), entry.getValue());
+ assertTrue("Should be empty. Actual is " + actual, actual.isEmpty());
+ connector.tableOperations().delete(tableName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1f31ca6c/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
index 4451987,0000000..ddf8ad7
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
@@@ -1,308 -1,0 +1,314 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.functional.ConfigurableCompactionIT;
+import org.apache.accumulo.test.functional.FunctionalTestUtils;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.io.Text;
++import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class UserCompactionStrategyIT extends AccumuloClusterHarness {
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 3 * 60;
+ }
+
++ @After
++ public void checkForDanglingFateLocks() {
++ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster());
++ }
++
+ @Test
+ public void testDropA() throws Exception {
+ Connector c = getConnector();
+
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+
+ writeFlush(c, tableName, "a");
+ writeFlush(c, tableName, "b");
+ // create a file that starts with A containing rows 'a' and 'b'
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+ writeFlush(c, tableName, "c");
+ writeFlush(c, tableName, "d");
+
+ // drop files that start with A
+ CompactionStrategyConfig csConfig = new CompactionStrategyConfig(TestCompactionStrategy.class.getName());
+ csConfig.setOptions(ImmutableMap.of("dropPrefix", "A", "inputPrefix", "F"));
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig));
+
+ Assert.assertEquals(ImmutableSet.of("c", "d"), getRows(c, tableName));
+
+ // this compaction should not drop files starting with A
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+ Assert.assertEquals(ImmutableSet.of("c", "d"), getRows(c, tableName));
+ }
+
+ private void testDropNone(Map<String,String> options) throws Exception {
+
+ Connector c = getConnector();
+
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+
+ writeFlush(c, tableName, "a");
+ writeFlush(c, tableName, "b");
+
+ CompactionStrategyConfig csConfig = new CompactionStrategyConfig(TestCompactionStrategy.class.getName());
+ csConfig.setOptions(options);
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig));
+
+ Assert.assertEquals(ImmutableSet.of("a", "b"), getRows(c, tableName));
+ }
+
+ @Test
+ public void testDropNone() throws Exception {
+ // test a compaction strategy that selects no files. In this case there is no work to do, want to ensure it does not hang.
+
+ testDropNone(ImmutableMap.of("inputPrefix", "Z"));
+ }
+
+ @Test
+ public void testDropNone2() throws Exception {
+ // test a compaction strategy that selects no files. This differs testDropNone() in that shouldCompact() will return true and getCompactionPlan() will
+ // return no work to do.
+
+ testDropNone(ImmutableMap.of("inputPrefix", "Z", "shouldCompact", "true"));
+ }
+
+ @Test
+ public void testPerTableClasspath() throws Exception {
+ // Can't assume that a test-resource will be on the server's classpath
+ Assume.assumeTrue(ClusterType.MINI == getClusterType());
+
+ // test per-table classpath + user specified compaction strategy
+
+ final Connector c = getConnector();
+ final String tableName = getUniqueNames(1)[0];
+ File target = new File(System.getProperty("user.dir"), "target");
+ Assert.assertTrue(target.mkdirs() || target.isDirectory());
+ File destFile = installJar(target, "/TestCompactionStrat.jar");
+ c.tableOperations().create(tableName);
+ c.instanceOperations().setProperty(Property.VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "context1", destFile.toString());
+ c.tableOperations().setProperty(tableName, Property.TABLE_CLASSPATH.getKey(), "context1");
+
+ c.tableOperations().addSplits(tableName, new TreeSet<>(Arrays.asList(new Text("efg"))));
+
+ writeFlush(c, tableName, "a");
+ writeFlush(c, tableName, "b");
+
+ writeFlush(c, tableName, "h");
+ writeFlush(c, tableName, "i");
+
+ Assert.assertEquals(4, FunctionalTestUtils.countRFiles(c, tableName));
+
+ // EfgCompactionStrat will only compact a tablet w/ end row of 'efg'. No other tablets are compacted.
+ CompactionStrategyConfig csConfig = new CompactionStrategyConfig("org.apache.accumulo.test.EfgCompactionStrat");
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig));
+
+ Assert.assertEquals(3, FunctionalTestUtils.countRFiles(c, tableName));
+
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+ Assert.assertEquals(2, FunctionalTestUtils.countRFiles(c, tableName));
+ }
+
+ private static File installJar(File destDir, String jarFile) throws IOException {
+ File destName = new File(destDir, new File(jarFile).getName());
+ FileUtils.copyInputStreamToFile(ConfigurableCompactionIT.class.getResourceAsStream(jarFile), destName);
+ return destName;
+ }
+
+ @Test
+ public void testIterators() throws Exception {
+ // test compaction strategy + iterators
+
+ Connector c = getConnector();
+
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+
+ writeFlush(c, tableName, "a");
+ writeFlush(c, tableName, "b");
+ // create a file that starts with A containing rows 'a' and 'b'
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+ writeFlush(c, tableName, "c");
+ writeFlush(c, tableName, "d");
+
+ Assert.assertEquals(3, FunctionalTestUtils.countRFiles(c, tableName));
+
+ // drop files that start with A
+ CompactionStrategyConfig csConfig = new CompactionStrategyConfig(TestCompactionStrategy.class.getName());
+ csConfig.setOptions(ImmutableMap.of("inputPrefix", "F"));
+
+ IteratorSetting iterConf = new IteratorSetting(21, "myregex", RegExFilter.class);
+ RegExFilter.setRegexs(iterConf, "a|c", null, null, null, false);
+
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig).setIterators(Arrays.asList(iterConf)));
+
+ // compaction strategy should only be applied to one file. If its applied to both, then row 'b' would be dropped by filter.
+ Assert.assertEquals(ImmutableSet.of("a", "b", "c"), getRows(c, tableName));
+
+ Assert.assertEquals(2, FunctionalTestUtils.countRFiles(c, tableName));
+
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+ // ensure that iterator is not applied
+ Assert.assertEquals(ImmutableSet.of("a", "b", "c"), getRows(c, tableName));
+
+ Assert.assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName));
+ }
+
+ @Test
+ public void testFileSize() throws Exception {
+ Connector c = getConnector();
+
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+
+ // write random data because its very unlikely it will compress
+ writeRandomValue(c, tableName, 1 << 16);
+ writeRandomValue(c, tableName, 1 << 16);
+
+ writeRandomValue(c, tableName, 1 << 9);
+ writeRandomValue(c, tableName, 1 << 7);
+ writeRandomValue(c, tableName, 1 << 6);
+
+ Assert.assertEquals(5, FunctionalTestUtils.countRFiles(c, tableName));
+
+ CompactionStrategyConfig csConfig = new CompactionStrategyConfig(SizeCompactionStrategy.class.getName());
+ csConfig.setOptions(ImmutableMap.of("size", "" + (1 << 15)));
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig));
+
+ Assert.assertEquals(3, FunctionalTestUtils.countRFiles(c, tableName));
+
+ csConfig = new CompactionStrategyConfig(SizeCompactionStrategy.class.getName());
+ csConfig.setOptions(ImmutableMap.of("size", "" + (1 << 17)));
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig));
+
+ Assert.assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName));
+
+ }
+
+ @Test
+ public void testConcurrent() throws Exception {
+ // two compactions without iterators or strategy should be able to run concurrently
+
+ Connector c = getConnector();
+
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+
+ // write random data because its very unlikely it will compress
+ writeRandomValue(c, tableName, 1 << 16);
+ writeRandomValue(c, tableName, 1 << 16);
+
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(false));
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+ Assert.assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName));
+
+ writeRandomValue(c, tableName, 1 << 16);
+
+ IteratorSetting iterConfig = new IteratorSetting(30, SlowIterator.class);
+ SlowIterator.setSleepTime(iterConfig, 1000);
+
+ long t1 = System.currentTimeMillis();
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(false).setIterators(Arrays.asList(iterConfig)));
+ try {
+ // this compaction should fail because previous one set iterators
+ c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+ if (System.currentTimeMillis() - t1 < 2000)
+ Assert.fail("Expected compaction to fail because another concurrent compaction set iterators");
+ } catch (AccumuloException e) {}
+ }
+
+ void writeRandomValue(Connector c, String tableName, int size) throws Exception {
+ Random rand = new Random();
+
+ byte data1[] = new byte[size];
+ rand.nextBytes(data1);
+
+ BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+
+ Mutation m1 = new Mutation("r" + rand.nextInt(909090));
+ m1.put("data", "bl0b", new Value(data1));
+
+ bw.addMutation(m1);
+ bw.close();
+ c.tableOperations().flush(tableName, null, null, true);
+ }
+
+ private Set<String> getRows(Connector c, String tableName) throws TableNotFoundException {
+ Set<String> rows = new HashSet<>();
+ Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
+
+ for (Entry<Key,Value> entry : scanner)
+ rows.add(entry.getKey().getRowData().toString());
+ return rows;
+
+ }
+
+ private void writeFlush(Connector conn, String tablename, String row) throws Exception {
+ BatchWriter bw = conn.createBatchWriter(tablename, new BatchWriterConfig());
+ Mutation m = new Mutation(row);
+ m.put("", "", "");
+ bw.addMutation(m);
+ bw.close();
+ conn.tableOperations().flush(tablename, null, null, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1f31ca6c/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
index 5808804,0000000..52fc57f
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
@@@ -1,298 -1,0 +1,270 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test.functional;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
- import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
- import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
- import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.Authorizations;
- import org.apache.accumulo.core.zookeeper.ZooUtil;
- import org.apache.accumulo.fate.AdminUtil;
- import org.apache.accumulo.fate.AdminUtil.FateStatus;
- import org.apache.accumulo.fate.ZooStore;
- import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
- import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
+import org.apache.hadoop.io.Text;
- import org.apache.zookeeper.KeeperException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ConcurrentDeleteTableIT extends AccumuloClusterHarness {
+
+ @Test
+ public void testConcurrentDeleteTablesOps() throws Exception {
+ final Connector c = getConnector();
+ String[] tables = getUniqueNames(2);
+
+ TreeSet<Text> splits = createSplits();
+
+ ExecutorService es = Executors.newFixedThreadPool(20);
+
+ int count = 0;
+ for (final String table : tables) {
+ c.tableOperations().create(table);
+ c.tableOperations().addSplits(table, splits);
+ writeData(c, table);
+ if (count == 1) {
+ c.tableOperations().flush(table, null, null, true);
+ }
+ count++;
+
+ int numDeleteOps = 20;
+ final CountDownLatch cdl = new CountDownLatch(numDeleteOps);
+
+ List<Future<?>> futures = new ArrayList<>();
+
+ for (int i = 0; i < numDeleteOps; i++) {
+ Future<?> future = es.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ cdl.countDown();
+ cdl.await();
+ c.tableOperations().delete(table);
+ } catch (TableNotFoundException e) {
+ // expected
+ } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ futures.add(future);
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ try {
+ c.createScanner(table, Authorizations.EMPTY);
+ Assert.fail("Expected table " + table + " to be gone.");
+ } catch (TableNotFoundException tnfe) {
+ // expected
+ }
+
- FateStatus fateStatus = getFateStatus();
-
- // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent delete tables could fail and leave dangling locks.
- Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size());
- Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size());
++ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster());
+ }
+
+ es.shutdown();
+ }
+
+ private TreeSet<Text> createSplits() {
+ TreeSet<Text> splits = new TreeSet<>();
+
+ for (int i = 0; i < 1000; i++) {
+ Text split = new Text(String.format("%09x", i * 100000));
+ splits.add(split);
+ }
+ return splits;
+ }
+
+ private static abstract class DelayedTableOp implements Runnable {
+ private CountDownLatch cdl;
+
+ DelayedTableOp(CountDownLatch cdl) {
+ this.cdl = cdl;
+ }
+
+ public void run() {
+ try {
+ cdl.countDown();
+ cdl.await();
+ Thread.sleep(10);
+ doTableOp();
- } catch (TableNotFoundException e) {
++ } catch (TableNotFoundException | TableOfflineException e) {
+ // expected
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected abstract void doTableOp() throws Exception;
+ }
+
+ @Test
+ public void testConcurrentFateOpsWithDelete() throws Exception {
+ final Connector c = getConnector();
+ String[] tables = getUniqueNames(2);
+
+ TreeSet<Text> splits = createSplits();
+
+ int numOperations = 8;
+
+ ExecutorService es = Executors.newFixedThreadPool(numOperations);
+
+ int count = 0;
+ for (final String table : tables) {
+ c.tableOperations().create(table);
+ c.tableOperations().addSplits(table, splits);
+ writeData(c, table);
+ if (count == 1) {
+ c.tableOperations().flush(table, null, null, true);
+ }
+ count++;
+
+ // increment this for each test
+ final CountDownLatch cdl = new CountDownLatch(numOperations);
+
+ List<Future<?>> futures = new ArrayList<>();
+
+ futures.add(es.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ cdl.countDown();
+ cdl.await();
+ c.tableOperations().delete(table);
+ } catch (TableNotFoundException | TableOfflineException e) {
+ // expected
+ } catch (InterruptedException | AccumuloException | AccumuloSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }));
+
+ futures.add(es.submit(new DelayedTableOp(cdl) {
+ @Override
+ protected void doTableOp() throws Exception {
+ c.tableOperations().compact(table, new CompactionConfig());
+ }
+ }));
+
+ futures.add(es.submit(new DelayedTableOp(cdl) {
+ @Override
+ protected void doTableOp() throws Exception {
+ c.tableOperations().merge(table, null, null);
+ }
+ }));
+
+ futures.add(es.submit(new DelayedTableOp(cdl) {
+ @Override
+ protected void doTableOp() throws Exception {
+ Map<String,String> m = Collections.emptyMap();
+ Set<String> s = Collections.emptySet();
+ c.tableOperations().clone(table, table + "_clone", true, m, s);
+ }
+ }));
+
+ futures.add(es.submit(new DelayedTableOp(cdl) {
+ @Override
+ protected void doTableOp() throws Exception {
+ c.tableOperations().deleteRows(table, null, null);
+ }
+ }));
+
+ futures.add(es.submit(new DelayedTableOp(cdl) {
+ @Override
+ protected void doTableOp() throws Exception {
+ c.tableOperations().cancelCompaction(table);
+ }
+ }));
+
+ futures.add(es.submit(new DelayedTableOp(cdl) {
+ @Override
+ protected void doTableOp() throws Exception {
+ c.tableOperations().rename(table, table + "_renamed");
+ }
+ }));
+
+ futures.add(es.submit(new DelayedTableOp(cdl) {
+ @Override
+ protected void doTableOp() throws Exception {
+ c.tableOperations().offline(table);
+ }
+ }));
+
+ Assert.assertEquals(numOperations, futures.size());
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ try {
+ c.createScanner(table, Authorizations.EMPTY);
+ Assert.fail("Expected table " + table + " to be gone.");
+ } catch (TableNotFoundException tnfe) {
+ // expected
+ }
+
- FateStatus fateStatus = getFateStatus();
-
- // ensure there are no dangling locks... before ACCUMULO-4575 was fixed concurrent delete tables could fail and leave dangling locks.
- Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size());
- Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size());
++ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster());
+ }
+
+ es.shutdown();
+ }
+
- private FateStatus getFateStatus() throws KeeperException, InterruptedException {
- Instance instance = getConnector().getInstance();
- AdminUtil<String> admin = new AdminUtil<>(false);
- String secret = getCluster().getSiteConfiguration().get(Property.INSTANCE_SECRET);
- IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret);
- ZooStore<String> zs = new ZooStore<String>(ZooUtil.getRoot(instance) + Constants.ZFATE, zk);
- FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, null, null);
- return fateStatus;
- }
-
+ private void writeData(Connector c, String table) throws TableNotFoundException, MutationsRejectedException {
+ BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
+ try {
+ Random rand = new Random();
+ for (int i = 0; i < 1000; i++) {
+ Mutation m = new Mutation(String.format("%09x", rand.nextInt(100000 * 1000)));
+ m.put("m", "order", "" + i);
+ bw.addMutation(m);
+ }
+ } finally {
+ bw.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1f31ca6c/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
index 30f4476,0000000..c5f9eab
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
@@@ -1,80 -1,0 +1,82 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+/**
+ * See ACCUMULO-779
+ */
+public class FateStarvationIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ @Test
+ public void run() throws Exception {
+ String tableName = getUniqueNames(1)[0];
+ Connector c = getConnector();
+ c.tableOperations().create(tableName);
+
+ c.tableOperations().addSplits(tableName, TestIngest.getSplitPoints(0, 100000, 50));
+
+ TestIngest.Opts opts = new TestIngest.Opts();
+ opts.random = 89;
+ opts.timestamp = 7;
+ opts.dataSize = 50;
+ opts.rows = 100000;
+ opts.cols = 1;
+ opts.setTableName(tableName);
+ ClientConfiguration clientConf = cluster.getClientConfig();
+ if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ opts.updateKerberosCredentials(clientConf);
+ } else {
+ opts.setPrincipal(getAdminPrincipal());
+ }
+ TestIngest.ingest(c, opts, new BatchWriterOpts());
+
+ c.tableOperations().flush(tableName, null, null, true);
+
+ List<Text> splits = new ArrayList<>(TestIngest.getSplitPoints(0, 100000, 67));
+ Random rand = new Random();
+
+ for (int i = 0; i < 100; i++) {
+ int idx1 = rand.nextInt(splits.size() - 1);
+ int idx2 = rand.nextInt(splits.size() - (idx1 + 1)) + idx1 + 1;
+
+ c.tableOperations().compact(tableName, splits.get(idx1), splits.get(idx2), false, false);
+ }
+
+ c.tableOperations().offline(tableName);
++
++ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1f31ca6c/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
index 829293e,0000000..8659922
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
@@@ -1,186 -1,0 +1,216 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertFalse;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
++import org.apache.accumulo.cluster.AccumuloCluster;
++import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.client.Connector;
++import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
++import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
++import org.apache.accumulo.core.zookeeper.ZooUtil;
++import org.apache.accumulo.fate.AdminUtil;
++import org.apache.accumulo.fate.AdminUtil.FateStatus;
++import org.apache.accumulo.fate.ZooStore;
++import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl.LogWriter;
++import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
++import org.apache.zookeeper.KeeperException;
++import org.junit.Assert;
+
+import com.google.common.collect.Iterators;
+
+public class FunctionalTestUtils {
+
+ public static int countRFiles(Connector c, String tableName) throws Exception {
+ Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ String tableId = c.tableOperations().tableIdMap().get(tableName);
+ scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+ scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+
+ return Iterators.size(scanner.iterator());
+ }
+
+ static void checkRFiles(Connector c, String tableName, int minTablets, int maxTablets, int minRFiles, int maxRFiles) throws Exception {
+ Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ String tableId = c.tableOperations().tableIdMap().get(tableName);
+ scanner.setRange(new Range(new Text(tableId + ";"), true, new Text(tableId + "<"), true));
+ scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
+
+ HashMap<Text,Integer> tabletFileCounts = new HashMap<>();
+
+ for (Entry<Key,Value> entry : scanner) {
+
+ Text row = entry.getKey().getRow();
+
+ Integer count = tabletFileCounts.get(row);
+ if (count == null)
+ count = 0;
+ if (entry.getKey().getColumnFamily().equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)) {
+ count = count + 1;
+ }
+
+ tabletFileCounts.put(row, count);
+ }
+
+ if (tabletFileCounts.size() < minTablets || tabletFileCounts.size() > maxTablets) {
+ throw new Exception("Did not find expected number of tablets " + tabletFileCounts.size());
+ }
+
+ Set<Entry<Text,Integer>> es = tabletFileCounts.entrySet();
+ for (Entry<Text,Integer> entry : es) {
+ if (entry.getValue() > maxRFiles || entry.getValue() < minRFiles) {
+ throw new Exception("tablet " + entry.getKey() + " has " + entry.getValue() + " map files");
+ }
+ }
+ }
+
+ static public void bulkImport(Connector c, FileSystem fs, String table, String dir) throws Exception {
+ String failDir = dir + "_failures";
+ Path failPath = new Path(failDir);
+ fs.delete(failPath, true);
+ fs.mkdirs(failPath);
+
+ // Ensure server can read/modify files
+ c.tableOperations().importDirectory(table, dir, failDir, false);
+
+ if (fs.listStatus(failPath).length > 0) {
+ throw new Exception("Some files failed to bulk import");
+ }
+
+ }
+
+ static public void checkSplits(Connector c, String table, int min, int max) throws Exception {
+ Collection<Text> splits = c.tableOperations().listSplits(table);
+ if (splits.size() < min || splits.size() > max) {
+ throw new Exception("# of table splits points out of range, #splits=" + splits.size() + " table=" + table + " min=" + min + " max=" + max);
+ }
+ }
+
+ static public void createRFiles(final Connector c, final FileSystem fs, String path, int rows, int splits, int threads) throws Exception {
+ fs.delete(new Path(path), true);
+ ExecutorService threadPool = Executors.newFixedThreadPool(threads);
+ final AtomicBoolean fail = new AtomicBoolean(false);
+ for (int i = 0; i < rows; i += rows / splits) {
+ final TestIngest.Opts opts = new TestIngest.Opts();
+ opts.outputFile = String.format("%s/mf%s", path, i);
+ opts.random = 56;
+ opts.timestamp = 1;
+ opts.dataSize = 50;
+ opts.rows = rows / splits;
+ opts.startRow = i;
+ opts.cols = 1;
+ threadPool.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ TestIngest.ingest(c, fs, opts, new BatchWriterOpts());
+ } catch (Exception e) {
+ fail.set(true);
+ }
+ }
+ });
+ }
+ threadPool.shutdown();
+ threadPool.awaitTermination(1, TimeUnit.HOURS);
+ assertFalse(fail.get());
+ }
+
+ static public String readAll(InputStream is) throws IOException {
+ byte[] buffer = new byte[4096];
+ StringBuilder result = new StringBuilder();
+ while (true) {
+ int n = is.read(buffer);
+ if (n <= 0)
+ break;
+ result.append(new String(buffer, 0, n));
+ }
+ return result.toString();
+ }
+
+ public static String readAll(MiniAccumuloClusterImpl c, Class<?> klass, Process p) throws Exception {
+ for (LogWriter writer : c.getLogWriters())
+ writer.flush();
+ return readAll(new FileInputStream(c.getConfig().getLogDir() + "/" + klass.getSimpleName() + "_" + p.hashCode() + ".out"));
+ }
+
+ static Mutation nm(String row, String cf, String cq, Value value) {
+ Mutation m = new Mutation(new Text(row));
+ m.put(new Text(cf), new Text(cq), value);
+ return m;
+ }
+
+ static Mutation nm(String row, String cf, String cq, String value) {
+ return nm(row, cf, cq, new Value(value.getBytes()));
+ }
+
+ public static SortedSet<Text> splits(String[] splits) {
+ SortedSet<Text> result = new TreeSet<>();
+ for (String split : splits)
+ result.add(new Text(split));
+ return result;
+ }
+
++ public static void assertNoDanglingFateLocks(Instance instance, AccumuloCluster cluster) {
++ FateStatus fateStatus = getFateStatus(instance, cluster);
++ Assert.assertEquals("Dangling FATE locks : " + fateStatus.getDanglingHeldLocks(), 0, fateStatus.getDanglingHeldLocks().size());
++ Assert.assertEquals("Dangling FATE locks : " + fateStatus.getDanglingWaitingLocks(), 0, fateStatus.getDanglingWaitingLocks().size());
++ }
++
++ private static FateStatus getFateStatus(Instance instance, AccumuloCluster cluster) {
++ try {
++ AdminUtil<String> admin = new AdminUtil<>(false);
++ String secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET);
++ IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), secret);
++ ZooStore<String> zs = new ZooStore<String>(ZooUtil.getRoot(instance) + Constants.ZFATE, zk);
++ FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, null, null);
++ return fateStatus;
++ } catch (KeeperException | InterruptedException e) {
++ throw new RuntimeException(e);
++ }
++ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1f31ca6c/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java
index 0c22196,0000000..47438a6
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RenameIT.java
@@@ -1,74 -1,0 +1,76 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.junit.Test;
+
+public class RenameIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ @Test
+ public void renameTest() throws Exception {
+ String[] tableNames = getUniqueNames(2);
+ String name1 = tableNames[0];
+ String name2 = tableNames[1];
+ BatchWriterOpts bwOpts = new BatchWriterOpts();
+ ScannerOpts scanOpts = new ScannerOpts();
+ TestIngest.Opts opts = new TestIngest.Opts();
+ opts.createTable = true;
+ opts.setTableName(name1);
+
+ final ClientConfiguration clientConfig = cluster.getClientConfig();
+ if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ opts.updateKerberosCredentials(clientConfig);
+ } else {
+ opts.setPrincipal(getAdminPrincipal());
+ }
+
+ Connector c = getConnector();
+ TestIngest.ingest(c, opts, bwOpts);
+ c.tableOperations().rename(name1, name2);
+ TestIngest.ingest(c, opts, bwOpts);
+ VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+
+ if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ vopts.updateKerberosCredentials(clientConfig);
+ } else {
+ vopts.setPrincipal(getAdminPrincipal());
+ }
+
+ vopts.setTableName(name2);
+ VerifyIngest.verifyIngest(c, vopts, scanOpts);
+ c.tableOperations().delete(name1);
+ c.tableOperations().rename(name2, name1);
+ vopts.setTableName(name1);
+ VerifyIngest.verifyIngest(c, vopts, scanOpts);
++
++ FunctionalTestUtils.assertNoDanglingFateLocks(getConnector().getInstance(), getCluster());
+ }
+
+}