You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/06/04 20:53:17 UTC
[36/43] accumulo git commit: ACCUMULO-3871 move ITs into distro jar,
stop building test jar
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/VolumeChooserIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeChooserIT.java b/test/src/main/java/org/apache/accumulo/test/VolumeChooserIT.java
new file mode 100644
index 0000000..c2dee9f
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/VolumeChooserIT.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.fs.PerTableVolumeChooser;
+import org.apache.accumulo.server.fs.PreferredVolumeChooser;
+import org.apache.accumulo.server.fs.RandomVolumeChooser;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class VolumeChooserIT extends ConfigurableMacBase {
+
+ private static final Text EMPTY = new Text();
+ private static final Value EMPTY_VALUE = new Value(new byte[] {});
+ private File volDirBase;
+ private Path v1, v2, v3, v4;
+ private String[] rows = "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(",");
+ private String namespace1;
+ private String namespace2;
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 30;
+ };
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ // Get 2 tablet servers
+ cfg.setNumTservers(2);
+ namespace1 = "ns_" + getUniqueNames(2)[0];
+ namespace2 = "ns_" + getUniqueNames(2)[1];
+
+ // Set the general volume chooser to the PerTableVolumeChooser so that different choosers can be specified
+ Map<String,String> siteConfig = new HashMap<String,String>();
+ siteConfig.put(Property.GENERAL_VOLUME_CHOOSER.getKey(), PerTableVolumeChooser.class.getName());
+ cfg.setSiteConfig(siteConfig);
+
+ // Set up 4 different volume paths
+ File baseDir = cfg.getDir();
+ volDirBase = new File(baseDir, "volumes");
+ File v1f = new File(volDirBase, "v1");
+ File v2f = new File(volDirBase, "v2");
+ File v3f = new File(volDirBase, "v3");
+ File v4f = new File(volDirBase, "v4");
+ v1 = new Path("file://" + v1f.getAbsolutePath());
+ v2 = new Path("file://" + v2f.getAbsolutePath());
+ v3 = new Path("file://" + v3f.getAbsolutePath());
+ v4 = new Path("file://" + v4f.getAbsolutePath());
+
+ // Only add volumes 1, 2, and 4 to the list of instance volumes to have one volume that isn't in the options list when they are choosing
+ cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString() + "," + v4.toString());
+
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+
+ super.configure(cfg, hadoopCoreSite);
+
+ }
+
+ public void addSplits(Connector connector, String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+ // Add 10 splits to the table
+ SortedSet<Text> partitions = new TreeSet<Text>();
+ for (String s : "b,e,g,j,l,o,q,t,v,y".split(","))
+ partitions.add(new Text(s));
+ connector.tableOperations().addSplits(tableName, partitions);
+ }
+
+ public void writeAndReadData(Connector connector, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ // Write some data to the table
+ BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
+ for (String s : rows) {
+ Mutation m = new Mutation(new Text(s));
+ m.put(EMPTY, EMPTY, EMPTY_VALUE);
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ // Write the data to disk, read it back
+ connector.tableOperations().flush(tableName, null, null, true);
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ int i = 0;
+ for (Entry<Key,Value> entry : scanner) {
+ assertEquals("Data read is not data written", rows[i++], entry.getKey().getRow().toString());
+ }
+ }
+
+ public void verifyVolumes(Connector connector, String tableName, Range tableRange, String vol) throws TableNotFoundException {
+ // Verify the new files are written to the Volumes specified
+ ArrayList<String> volumes = new ArrayList<String>();
+ for (String s : vol.split(","))
+ volumes.add(s);
+
+ Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ scanner.setRange(tableRange);
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ int fileCount = 0;
+ for (Entry<Key,Value> entry : scanner) {
+ boolean inVolume = false;
+ for (String volume : volumes) {
+ if (entry.getKey().getColumnQualifier().toString().contains(volume))
+ inVolume = true;
+ }
+ assertTrue("Data not written to the correct volumes", inVolume);
+ fileCount++;
+ }
+ assertEquals("Wrong number of files", 11, fileCount);
+ }
+
+ // Test that uses two tables with 10 split points each. They each use the PreferredVolumeChooser to choose volumes.
+ @Test
+ public void twoTablesPreferredVolumeChooser() throws Exception {
+ log.info("Starting twoTablesPreferredVolumeChooser");
+
+ // Create namespace
+ Connector connector = getConnector();
+ connector.namespaceOperations().create(namespace1);
+
+ // Set properties on the namespace
+ String propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+ String volume = PreferredVolumeChooser.class.getName();
+ connector.namespaceOperations().setProperty(namespace1, propertyName, volume);
+
+ propertyName = "table.custom.preferredVolumes";
+ volume = v2.toString();
+ connector.namespaceOperations().setProperty(namespace1, propertyName, volume);
+
+ // Create table1 on namespace1
+ String tableName = namespace1 + ".1";
+ connector.tableOperations().create(tableName);
+ String tableID = connector.tableOperations().tableIdMap().get(tableName);
+
+ // Add 10 splits to the table
+ addSplits(connector, tableName);
+ // Write some data to the table
+ writeAndReadData(connector, tableName);
+ // Verify the new files are written to the Volumes specified
+ verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), volume);
+
+ connector.namespaceOperations().create(namespace2);
+
+ // Set properties on the namespace
+ propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+ volume = PreferredVolumeChooser.class.getName();
+ connector.namespaceOperations().setProperty(namespace2, propertyName, volume);
+
+ propertyName = "table.custom.preferredVolumes";
+ volume = v1.toString();
+ connector.namespaceOperations().setProperty(namespace2, propertyName, volume);
+
+ // Create table2 on namespace2
+ String tableName2 = namespace2 + ".1";
+
+ connector.tableOperations().create(tableName2);
+ String tableID2 = connector.tableOperations().tableIdMap().get(tableName2);
+
+ // Add 10 splits to the table
+ addSplits(connector, tableName2);
+ // Write some data to the table
+ writeAndReadData(connector, tableName2);
+ // Verify the new files are written to the Volumes specified
+ verifyVolumes(connector, tableName2, TabletsSection.getRange(tableID2), volume);
+ }
+
+ // Test that uses two tables with 10 split points each. They each use the RandomVolumeChooser to choose volumes.
+ @Test
+ public void twoTablesRandomVolumeChooser() throws Exception {
+ log.info("Starting twoTablesRandomVolumeChooser()");
+
+ // Create namespace
+ Connector connector = getConnector();
+ connector.namespaceOperations().create(namespace1);
+
+ // Set properties on the namespace
+ String propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+ String volume = RandomVolumeChooser.class.getName();
+ connector.namespaceOperations().setProperty(namespace1, propertyName, volume);
+
+ // Create table1 on namespace1
+ String tableName = namespace1 + ".1";
+ connector.tableOperations().create(tableName);
+ String tableID = connector.tableOperations().tableIdMap().get(tableName);
+
+ // Add 10 splits to the table
+ addSplits(connector, tableName);
+ // Write some data to the table
+ writeAndReadData(connector, tableName);
+ // Verify the new files are written to the Volumes specified
+
+ verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), v1.toString() + "," + v2.toString() + "," + v4.toString());
+
+ connector.namespaceOperations().create(namespace2);
+
+ // Set properties on the namespace
+ propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+ volume = RandomVolumeChooser.class.getName();
+ connector.namespaceOperations().setProperty(namespace2, propertyName, volume);
+
+ // Create table2 on namespace2
+ String tableName2 = namespace2 + ".1";
+ connector.tableOperations().create(tableName2);
+ String tableID2 = connector.tableOperations().tableIdMap().get(tableName);
+
+ // / Add 10 splits to the table
+ addSplits(connector, tableName2);
+ // Write some data to the table
+ writeAndReadData(connector, tableName2);
+ // Verify the new files are written to the Volumes specified
+ verifyVolumes(connector, tableName2, TabletsSection.getRange(tableID2), v1.toString() + "," + v2.toString() + "," + v4.toString());
+ }
+
+ // Test that uses two tables with 10 split points each. The first uses the RandomVolumeChooser and the second uses the
+ // StaticVolumeChooser to choose volumes.
+ @Test
+ public void twoTablesDiffChoosers() throws Exception {
+ log.info("Starting twoTablesDiffChoosers");
+
+ // Create namespace
+ Connector connector = getConnector();
+ connector.namespaceOperations().create(namespace1);
+
+ // Set properties on the namespace
+ String propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+ String volume = RandomVolumeChooser.class.getName();
+ connector.namespaceOperations().setProperty(namespace1, propertyName, volume);
+
+ // Create table1 on namespace1
+ String tableName = namespace1 + ".1";
+ connector.tableOperations().create(tableName);
+ String tableID = connector.tableOperations().tableIdMap().get(tableName);
+
+ // Add 10 splits to the table
+ addSplits(connector, tableName);
+ // Write some data to the table
+ writeAndReadData(connector, tableName);
+ // Verify the new files are written to the Volumes specified
+
+ verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), v1.toString() + "," + v2.toString() + "," + v4.toString());
+
+ connector.namespaceOperations().create(namespace2);
+
+ // Set properties on the namespace
+ propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+ volume = PreferredVolumeChooser.class.getName();
+ connector.namespaceOperations().setProperty(namespace2, propertyName, volume);
+
+ propertyName = "table.custom.preferredVolumes";
+ volume = v1.toString();
+ connector.namespaceOperations().setProperty(namespace2, propertyName, volume);
+
+ // Create table2 on namespace2
+ String tableName2 = namespace2 + ".1";
+ connector.tableOperations().create(tableName2);
+ String tableID2 = connector.tableOperations().tableIdMap().get(tableName2);
+
+ // Add 10 splits to the table
+ addSplits(connector, tableName2);
+ // Write some data to the table
+ writeAndReadData(connector, tableName2);
+ // Verify the new files are written to the Volumes specified
+ verifyVolumes(connector, tableName2, TabletsSection.getRange(tableID2), volume);
+ }
+
+ // Test that uses one table with 10 split points each. It uses the StaticVolumeChooser, but no preferred volume is specified. This means that the volume
+ // is chosen randomly from all instance volumes.
+ @Test
+ public void missingVolumePreferredVolumeChooser() throws Exception {
+ log.info("Starting missingVolumePreferredVolumeChooser");
+
+ // Create namespace
+ Connector connector = getConnector();
+ connector.namespaceOperations().create(namespace1);
+
+ // Set properties on the namespace
+ String propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+ String volume = PreferredVolumeChooser.class.getName();
+ connector.namespaceOperations().setProperty(namespace1, propertyName, volume);
+
+ // Create table1 on namespace1
+ String tableName = namespace1 + ".1";
+ connector.tableOperations().create(tableName);
+ String tableID = connector.tableOperations().tableIdMap().get(tableName);
+
+ // Add 10 splits to the table
+ addSplits(connector, tableName);
+ // Write some data to the table
+ writeAndReadData(connector, tableName);
+ // Verify the new files are written to the Volumes specified
+ verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), v1.toString() + "," + v2.toString() + "," + v4.toString());
+ }
+
+ // Test that uses one table with 10 split points each. It uses the PreferredVolumeChooser, but preferred volume is not an instance volume. This means that the
+ // volume is chosen randomly from all instance volumes
+ @Test
+ public void notInstancePreferredVolumeChooser() throws Exception {
+ log.info("Starting notInstancePreferredVolumeChooser");
+
+ // Create namespace
+ Connector connector = getConnector();
+ connector.namespaceOperations().create(namespace1);
+
+ // Set properties on the namespace
+ String propertyName = Property.TABLE_VOLUME_CHOOSER.getKey();
+ String volume = PreferredVolumeChooser.class.getName();
+ connector.namespaceOperations().setProperty(namespace1, propertyName, volume);
+
+ propertyName = "table.custom.preferredVolumes";
+ volume = v3.toString();
+ connector.namespaceOperations().setProperty(namespace1, propertyName, volume);
+
+ // Create table1 on namespace1
+ String tableName = namespace1 + ".1";
+ connector.tableOperations().create(tableName);
+ String tableID = connector.tableOperations().tableIdMap().get(tableName);
+
+ // Add 10 splits to the table
+ addSplits(connector, tableName);
+ // Write some data to the table
+ writeAndReadData(connector, tableName);
+ // Verify the new files are written to the Volumes specified
+ verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), v1.toString() + "," + v2.toString() + "," + v4.toString());
+ }
+
+ // Test that uses one table with 10 split points each. It does not specify a specific chooser, so the volume is chosen randomly from all instance volumes.
+ @Test
+ public void chooserNotSpecified() throws Exception {
+ log.info("Starting chooserNotSpecified");
+
+ // Create a table
+ Connector connector = getConnector();
+ String tableName = getUniqueNames(2)[0];
+ connector.tableOperations().create(tableName);
+ String tableID = connector.tableOperations().tableIdMap().get(tableName);
+
+ // Add 10 splits to the table
+ addSplits(connector, tableName);
+ // Write some data to the table
+ writeAndReadData(connector, tableName);
+
+ // Verify the new files are written to the Volumes specified
+ verifyVolumes(connector, tableName, TabletsSection.getRange(tableID), v1.toString() + "," + v2.toString() + "," + v4.toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
new file mode 100644
index 0000000..c25370d
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.DiskUsage;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.init.Initialize;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
+import org.apache.accumulo.server.util.Admin;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class VolumeIT extends ConfigurableMacBase {
+
+ private static final Text EMPTY = new Text();
+ private static final Value EMPTY_VALUE = new Value(new byte[] {});
+ private File volDirBase;
+ private Path v1, v2;
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 5 * 60;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ File baseDir = cfg.getDir();
+ volDirBase = new File(baseDir, "volumes");
+ File v1f = new File(volDirBase, "v1");
+ File v2f = new File(volDirBase, "v2");
+ v1 = new Path("file://" + v1f.getAbsolutePath());
+ v2 = new Path("file://" + v2f.getAbsolutePath());
+
+ // Run MAC on two locations in the local file system
+ URI v1Uri = v1.toUri();
+ cfg.setProperty(Property.INSTANCE_DFS_DIR, v1Uri.getPath());
+ cfg.setProperty(Property.INSTANCE_DFS_URI, v1Uri.getScheme() + v1Uri.getHost());
+ cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString());
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+
+ super.configure(cfg, hadoopCoreSite);
+ }
+
+ @Test
+ public void test() throws Exception {
+ // create a table
+ Connector connector = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+ SortedSet<Text> partitions = new TreeSet<Text>();
+ // with some splits
+ for (String s : "d,m,t".split(","))
+ partitions.add(new Text(s));
+ connector.tableOperations().addSplits(tableName, partitions);
+ // scribble over the splits
+ BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
+ String[] rows = "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(",");
+ for (String s : rows) {
+ Mutation m = new Mutation(new Text(s));
+ m.put(EMPTY, EMPTY, EMPTY_VALUE);
+ bw.addMutation(m);
+ }
+ bw.close();
+ // write the data to disk, read it back
+ connector.tableOperations().flush(tableName, null, null, true);
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ int i = 0;
+ for (Entry<Key,Value> entry : scanner) {
+ assertEquals(rows[i++], entry.getKey().getRow().toString());
+ }
+ // verify the new files are written to the different volumes
+ scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ scanner.setRange(new Range("1", "1<"));
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ int fileCount = 0;
+
+ for (Entry<Key,Value> entry : scanner) {
+ boolean inV1 = entry.getKey().getColumnQualifier().toString().contains(v1.toString());
+ boolean inV2 = entry.getKey().getColumnQualifier().toString().contains(v2.toString());
+ assertTrue(inV1 || inV2);
+ fileCount++;
+ }
+ assertEquals(4, fileCount);
+ List<DiskUsage> diskUsage = connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
+ assertEquals(1, diskUsage.size());
+ long usage = diskUsage.get(0).getUsage().longValue();
+ System.out.println("usage " + usage);
+ assertTrue(usage > 700 && usage < 800);
+ }
+
+ private void verifyData(List<String> expected, Scanner createScanner) {
+
+ List<String> actual = new ArrayList<String>();
+
+ for (Entry<Key,Value> entry : createScanner) {
+ Key k = entry.getKey();
+ actual.add(k.getRow() + ":" + k.getColumnFamily() + ":" + k.getColumnQualifier() + ":" + entry.getValue());
+ }
+
+ Collections.sort(expected);
+ Collections.sort(actual);
+
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testRelativePaths() throws Exception {
+
+ List<String> expected = new ArrayList<String>();
+
+ Connector connector = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName, new NewTableConfiguration().withoutDefaultIterators());
+
+ String tableId = connector.tableOperations().tableIdMap().get(tableName);
+
+ SortedSet<Text> partitions = new TreeSet<Text>();
+ // with some splits
+ for (String s : "c,g,k,p,s,v".split(","))
+ partitions.add(new Text(s));
+
+ connector.tableOperations().addSplits(tableName, partitions);
+
+ BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
+
+ // create two files in each tablet
+
+ String[] rows = "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(",");
+ for (String s : rows) {
+ Mutation m = new Mutation(s);
+ m.put("cf1", "cq1", "1");
+ bw.addMutation(m);
+ expected.add(s + ":cf1:cq1:1");
+ }
+
+ bw.flush();
+ connector.tableOperations().flush(tableName, null, null, true);
+
+ for (String s : rows) {
+ Mutation m = new Mutation(s);
+ m.put("cf1", "cq1", "2");
+ bw.addMutation(m);
+ expected.add(s + ":cf1:cq1:2");
+ }
+
+ bw.close();
+ connector.tableOperations().flush(tableName, null, null, true);
+
+ verifyData(expected, connector.createScanner(tableName, Authorizations.EMPTY));
+
+ connector.tableOperations().offline(tableName, true);
+
+ connector.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+
+ Scanner metaScanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ metaScanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ metaScanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+
+ BatchWriter mbw = connector.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+
+ for (Entry<Key,Value> entry : metaScanner) {
+ String cq = entry.getKey().getColumnQualifier().toString();
+ if (cq.startsWith(v1.toString())) {
+ Path path = new Path(cq);
+ String relPath = "/" + path.getParent().getName() + "/" + path.getName();
+ Mutation fileMut = new Mutation(entry.getKey().getRow());
+ fileMut.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
+ fileMut.put(entry.getKey().getColumnFamily().toString(), relPath, entry.getValue().toString());
+ mbw.addMutation(fileMut);
+ }
+ }
+
+ mbw.close();
+
+ connector.tableOperations().online(tableName, true);
+
+ verifyData(expected, connector.createScanner(tableName, Authorizations.EMPTY));
+
+ connector.tableOperations().compact(tableName, null, null, true, true);
+
+ verifyData(expected, connector.createScanner(tableName, Authorizations.EMPTY));
+
+ for (Entry<Key,Value> entry : metaScanner) {
+ String cq = entry.getKey().getColumnQualifier().toString();
+ Path path = new Path(cq);
+ Assert.assertTrue("relative path not deleted " + path.toString(), path.depth() > 2);
+ }
+
+ }
+
+ @Test
+ public void testAddVolumes() throws Exception {
+
+ String[] tableNames = getUniqueNames(2);
+
+ // grab this before shutting down cluster
+ String uuid = new ZooKeeperInstance(cluster.getClientConfig()).getInstanceID();
+
+ verifyVolumesUsed(tableNames[0], false, v1, v2);
+
+ Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ cluster.stop();
+
+ Configuration conf = new Configuration(false);
+ conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml"));
+
+ File v3f = new File(volDirBase, "v3");
+ assertTrue(v3f.mkdir() || v3f.isDirectory());
+ Path v3 = new Path("file://" + v3f.getAbsolutePath());
+
+ conf.set(Property.INSTANCE_VOLUMES.getKey(), v1.toString() + "," + v2.toString() + "," + v3.toString());
+ BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml")));
+ conf.writeXml(fos);
+ fos.close();
+
+ // initialize volume
+ Assert.assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").waitFor());
+
+ // check that all volumes are initialized
+ for (Path volumePath : Arrays.asList(v1, v2, v3)) {
+ FileSystem fs = volumePath.getFileSystem(CachedConfiguration.getInstance());
+ Path vp = new Path(volumePath, ServerConstants.INSTANCE_ID_DIR);
+ FileStatus[] iids = fs.listStatus(vp);
+ Assert.assertEquals(1, iids.length);
+ Assert.assertEquals(uuid, iids[0].getPath().getName());
+ }
+
+ // start cluster and verify that new volume is used
+ cluster.start();
+
+ verifyVolumesUsed(tableNames[1], false, v1, v2, v3);
+ }
+
+ @Test
+ public void testNonConfiguredVolumes() throws Exception {
+
+ String[] tableNames = getUniqueNames(2);
+
+ // grab this before shutting down cluster
+ String uuid = new ZooKeeperInstance(cluster.getClientConfig()).getInstanceID();
+
+ verifyVolumesUsed(tableNames[0], false, v1, v2);
+
+ Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ cluster.stop();
+
+ Configuration conf = new Configuration(false);
+ conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml"));
+
+ File v3f = new File(volDirBase, "v3");
+ assertTrue(v3f.mkdir() || v3f.isDirectory());
+ Path v3 = new Path("file://" + v3f.getAbsolutePath());
+
+ conf.set(Property.INSTANCE_VOLUMES.getKey(), v2.toString() + "," + v3.toString());
+ BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml")));
+ conf.writeXml(fos);
+ fos.close();
+
+ // initialize volume
+ Assert.assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").waitFor());
+
+ // check that all volumes are initialized
+ for (Path volumePath : Arrays.asList(v1, v2, v3)) {
+ FileSystem fs = volumePath.getFileSystem(CachedConfiguration.getInstance());
+ Path vp = new Path(volumePath, ServerConstants.INSTANCE_ID_DIR);
+ FileStatus[] iids = fs.listStatus(vp);
+ Assert.assertEquals(1, iids.length);
+ Assert.assertEquals(uuid, iids[0].getPath().getName());
+ }
+
+ // start cluster and verify that new volume is used
+ cluster.start();
+
+ // Make sure we can still read the tables (tableNames[0] is very likely to have a file still on v1)
+ List<String> expected = new ArrayList<String>();
+ for (int i = 0; i < 100; i++) {
+ String row = String.format("%06d", i * 100 + 3);
+ expected.add(row + ":cf1:cq1:1");
+ }
+
+ verifyData(expected, getConnector().createScanner(tableNames[0], Authorizations.EMPTY));
+
+ // v1 should not have any data for tableNames[1]
+ verifyVolumesUsed(tableNames[1], false, v2, v3);
+ }
+
+ private void writeData(String tableName, Connector conn) throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException,
+ MutationsRejectedException {
+ TreeSet<Text> splits = new TreeSet<Text>();
+ for (int i = 1; i < 100; i++) {
+ splits.add(new Text(String.format("%06d", i * 100)));
+ }
+
+ conn.tableOperations().create(tableName);
+ conn.tableOperations().addSplits(tableName, splits);
+
+ BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ String row = String.format("%06d", i * 100 + 3);
+ Mutation m = new Mutation(row);
+ m.put("cf1", "cq1", "1");
+ bw.addMutation(m);
+ }
+
+ bw.close();
+ }
+
+ private void verifyVolumesUsed(String tableName, boolean shouldExist, Path... paths) throws Exception {
+
+ Connector conn = getConnector();
+
+ List<String> expected = new ArrayList<String>();
+ for (int i = 0; i < 100; i++) {
+ String row = String.format("%06d", i * 100 + 3);
+ expected.add(row + ":cf1:cq1:1");
+ }
+
+ if (!conn.tableOperations().exists(tableName)) {
+ Assert.assertFalse(shouldExist);
+
+ writeData(tableName, conn);
+
+ verifyData(expected, conn.createScanner(tableName, Authorizations.EMPTY));
+
+ conn.tableOperations().flush(tableName, null, null, true);
+ }
+
+ verifyData(expected, conn.createScanner(tableName, Authorizations.EMPTY));
+
+ String tableId = conn.tableOperations().tableIdMap().get(tableName);
+ Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(metaScanner);
+ metaScanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ metaScanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+
+ int counts[] = new int[paths.length];
+
+ outer: for (Entry<Key,Value> entry : metaScanner) {
+ String cf = entry.getKey().getColumnFamily().toString();
+ String cq = entry.getKey().getColumnQualifier().toString();
+
+ String path;
+ if (cf.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME.toString()))
+ path = cq;
+ else
+ path = entry.getValue().toString();
+
+ for (int i = 0; i < paths.length; i++) {
+ if (path.startsWith(paths[i].toString())) {
+ counts[i]++;
+ continue outer;
+ }
+ }
+
+ Assert.fail("Unexpected volume " + path);
+ }
+
+ Instance i = conn.getInstance();
+ ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(), "");
+ WalStateManager wals = new WalStateManager(i, zk);
+ outer: for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+ for (Path path : paths) {
+ if (entry.getKey().toString().startsWith(path.toString())) {
+ continue outer;
+ }
+ }
+ Assert.fail("Unexpected volume " + entry.getKey());
+ }
+
+ // if a volume is chosen randomly for each tablet, then the probability that a volume will not be chosen for any tablet is ((num_volumes -
+ // 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the probability that only 2 volumes would be chosen is 2.46e-18
+
+ int sum = 0;
+ for (int count : counts) {
+ Assert.assertTrue(count > 0);
+ sum += count;
+ }
+
+ Assert.assertEquals(200, sum);
+
+ }
+
+ @Test
+ public void testRemoveVolumes() throws Exception {
+ String[] tableNames = getUniqueNames(2);
+
+ verifyVolumesUsed(tableNames[0], false, v1, v2);
+
+ Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ cluster.stop();
+
+ Configuration conf = new Configuration(false);
+ conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml"));
+
+ conf.set(Property.INSTANCE_VOLUMES.getKey(), v2.toString());
+ BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml")));
+ conf.writeXml(fos);
+ fos.close();
+
+ // start cluster and verify that volume was decommisioned
+ cluster.start();
+
+ Connector conn = cluster.getConnector("root", new PasswordToken(ROOT_PASSWORD));
+ conn.tableOperations().compact(tableNames[0], null, null, true, true);
+
+ verifyVolumesUsed(tableNames[0], true, v2);
+
+ // check that root tablet is not on volume 1
+ ZooReader zreader = new ZooReader(cluster.getZooKeepers(), 30000);
+ String zpath = ZooUtil.getRoot(new ZooKeeperInstance(cluster.getClientConfig())) + RootTable.ZROOT_TABLET_PATH;
+ String rootTabletDir = new String(zreader.getData(zpath, false, null), UTF_8);
+ Assert.assertTrue(rootTabletDir.startsWith(v2.toString()));
+
+ conn.tableOperations().clone(tableNames[0], tableNames[1], true, new HashMap<String,String>(), new HashSet<String>());
+
+ conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
+ conn.tableOperations().flush(RootTable.NAME, null, null, true);
+
+ verifyVolumesUsed(tableNames[0], true, v2);
+ verifyVolumesUsed(tableNames[1], true, v2);
+
+ }
+
+ private void testReplaceVolume(boolean cleanShutdown) throws Exception {
+ String[] tableNames = getUniqueNames(3);
+
+ verifyVolumesUsed(tableNames[0], false, v1, v2);
+
+ // write to 2nd table, but do not flush data to disk before shutdown
+ writeData(tableNames[1], cluster.getConnector("root", new PasswordToken(ROOT_PASSWORD)));
+
+ if (cleanShutdown)
+ Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+
+ cluster.stop();
+
+ File v1f = new File(v1.toUri());
+ File v8f = new File(new File(v1.getParent().toUri()), "v8");
+ Assert.assertTrue("Failed to rename " + v1f + " to " + v8f, v1f.renameTo(v8f));
+ Path v8 = new Path(v8f.toURI());
+
+ File v2f = new File(v2.toUri());
+ File v9f = new File(new File(v2.getParent().toUri()), "v9");
+ Assert.assertTrue("Failed to rename " + v2f + " to " + v9f, v2f.renameTo(v9f));
+ Path v9 = new Path(v9f.toURI());
+
+ Configuration conf = new Configuration(false);
+ conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml"));
+
+ conf.set(Property.INSTANCE_VOLUMES.getKey(), v8 + "," + v9);
+ conf.set(Property.INSTANCE_VOLUMES_REPLACEMENTS.getKey(), v1 + " " + v8 + "," + v2 + " " + v9);
+ BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml")));
+ conf.writeXml(fos);
+ fos.close();
+
+ // start cluster and verify that volumes were replaced
+ cluster.start();
+
+ verifyVolumesUsed(tableNames[0], true, v8, v9);
+ verifyVolumesUsed(tableNames[1], true, v8, v9);
+
+ // verify writes to new dir
+ getConnector().tableOperations().compact(tableNames[0], null, null, true, true);
+ getConnector().tableOperations().compact(tableNames[1], null, null, true, true);
+
+ verifyVolumesUsed(tableNames[0], true, v8, v9);
+ verifyVolumesUsed(tableNames[1], true, v8, v9);
+
+ // check that root tablet is not on volume 1 or 2
+ ZooReader zreader = new ZooReader(cluster.getZooKeepers(), 30000);
+ String zpath = ZooUtil.getRoot(new ZooKeeperInstance(cluster.getClientConfig())) + RootTable.ZROOT_TABLET_PATH;
+ String rootTabletDir = new String(zreader.getData(zpath, false, null), UTF_8);
+ Assert.assertTrue(rootTabletDir.startsWith(v8.toString()) || rootTabletDir.startsWith(v9.toString()));
+
+ getConnector().tableOperations().clone(tableNames[1], tableNames[2], true, new HashMap<String,String>(), new HashSet<String>());
+
+ getConnector().tableOperations().flush(MetadataTable.NAME, null, null, true);
+ getConnector().tableOperations().flush(RootTable.NAME, null, null, true);
+
+ verifyVolumesUsed(tableNames[0], true, v8, v9);
+ verifyVolumesUsed(tableNames[1], true, v8, v9);
+ verifyVolumesUsed(tableNames[2], true, v8, v9);
+ }
+
+ @Test
+ public void testCleanReplaceVolumes() throws Exception {
+ testReplaceVolume(true);
+ }
+
+ @Test
+ public void testDirtyReplaceVolumes() throws Exception {
+ testReplaceVolume(false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java b/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java
new file mode 100644
index 0000000..249bf14
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class WaitForBalanceIT extends ConfigurableMacBase {
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Test
+ public void test() throws Exception {
+ final Connector c = getConnector();
+ // ensure the metadata table is online
+ Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+ c.instanceOperations().waitForBalance();
+ assertTrue(isBalanced());
+ final String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ c.instanceOperations().waitForBalance();
+ final SortedSet<Text> partitionKeys = new TreeSet<Text>();
+ for (int i = 0; i < 1000; i++) {
+ partitionKeys.add(new Text("" + i));
+ }
+ c.tableOperations().addSplits(tableName, partitionKeys);
+ assertFalse(isBalanced());
+ c.instanceOperations().waitForBalance();
+ assertTrue(isBalanced());
+ }
+
+ private boolean isBalanced() throws Exception {
+ final Map<String,Integer> counts = new HashMap<String,Integer>();
+ int offline = 0;
+ final Connector c = getConnector();
+ for (String tableName : new String[] {MetadataTable.NAME, RootTable.NAME}) {
+ final Scanner s = c.createScanner(tableName, Authorizations.EMPTY);
+ s.setRange(MetadataSchema.TabletsSection.getRange());
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
+ MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(s);
+ String location = null;
+ for (Entry<Key,Value> entry : s) {
+ Key key = entry.getKey();
+ if (key.getColumnFamily().equals(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME)) {
+ location = key.getColumnQualifier().toString();
+ } else if (MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
+ if (location == null) {
+ offline++;
+ } else {
+ Integer count = counts.get(location);
+ if (count == null)
+ count = new Integer(0);
+ count = new Integer(count.intValue() + 1);
+ counts.put(location, count);
+ }
+ location = null;
+ }
+ }
+ }
+ // the replication table is expected to be offline for this test, so ignore it
+ if (offline > 1) {
+ System.out.println("Offline tablets " + offline);
+ return false;
+ }
+ int average = 0;
+ for (Integer i : counts.values()) {
+ average += i;
+ }
+ average /= counts.size();
+ System.out.println(counts);
+ int tablesCount = c.tableOperations().list().size();
+ for (Entry<String,Integer> hostCount : counts.entrySet()) {
+ if (Math.abs(average - hostCount.getValue()) > tablesCount) {
+ System.out.println("Average " + average + " count " + hostCount.getKey() + ": " + hostCount.getValue());
+ return false;
+ }
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
new file mode 100644
index 0000000..118f053
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloInputFormatIT.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.lang.System.currentTimeMillis;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AccumuloInputFormatIT extends AccumuloClusterHarness {
+
+ AccumuloInputFormat inputFormat;
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 4 * 60;
+ }
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
+ }
+
+ @Before
+ public void before() {
+ inputFormat = new AccumuloInputFormat();
+ }
+
+ /**
+ * Tests several different paths through the getSplits() method by setting different properties and verifying the results.
+ */
+ @Test
+ public void testGetSplits() throws Exception {
+ Connector conn = getConnector();
+ String table = getUniqueNames(1)[0];
+ conn.tableOperations().create(table);
+ insertData(table, currentTimeMillis());
+
+ ClientConfiguration clientConf = cluster.getClientConfig();
+ AccumuloConfiguration clusterClientConf = new ConfigurationCopy(new DefaultConfiguration());
+
+ // Pass SSL and CredentialProvider options into the ClientConfiguration given to AccumuloInputFormat
+ boolean sslEnabled = Boolean.valueOf(clusterClientConf.get(Property.INSTANCE_RPC_SSL_ENABLED));
+ if (sslEnabled) {
+ ClientProperty[] sslProperties = new ClientProperty[] {ClientProperty.INSTANCE_RPC_SSL_ENABLED, ClientProperty.INSTANCE_RPC_SSL_CLIENT_AUTH,
+ ClientProperty.RPC_SSL_KEYSTORE_PATH, ClientProperty.RPC_SSL_KEYSTORE_TYPE, ClientProperty.RPC_SSL_KEYSTORE_PASSWORD,
+ ClientProperty.RPC_SSL_TRUSTSTORE_PATH, ClientProperty.RPC_SSL_TRUSTSTORE_TYPE, ClientProperty.RPC_SSL_TRUSTSTORE_PASSWORD,
+ ClientProperty.RPC_USE_JSSE, ClientProperty.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS};
+
+ for (ClientProperty prop : sslProperties) {
+ // The default property is returned if it's not in the ClientConfiguration so we don't have to check if the value is actually defined
+ clientConf.setProperty(prop, clusterClientConf.get(prop.getKey()));
+ }
+ }
+
+ Job job = Job.getInstance();
+ AccumuloInputFormat.setInputTableName(job, table);
+ AccumuloInputFormat.setZooKeeperInstance(job, clientConf);
+ AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+
+ // split table
+ TreeSet<Text> splitsToAdd = new TreeSet<Text>();
+ for (int i = 0; i < 10000; i += 1000)
+ splitsToAdd.add(new Text(String.format("%09d", i)));
+ conn.tableOperations().addSplits(table, splitsToAdd);
+ UtilWaitThread.sleep(500); // wait for splits to be propagated
+
+ // get splits without setting any range
+ Collection<Text> actualSplits = conn.tableOperations().listSplits(table);
+ List<InputSplit> splits = inputFormat.getSplits(job);
+ assertEquals(actualSplits.size() + 1, splits.size()); // No ranges set on the job so it'll start with -inf
+
+ // set ranges and get splits
+ List<Range> ranges = new ArrayList<Range>();
+ for (Text text : actualSplits)
+ ranges.add(new Range(text));
+ AccumuloInputFormat.setRanges(job, ranges);
+ splits = inputFormat.getSplits(job);
+ assertEquals(actualSplits.size(), splits.size());
+
+ // offline mode
+ AccumuloInputFormat.setOfflineTableScan(job, true);
+ try {
+ inputFormat.getSplits(job);
+ fail("An exception should have been thrown");
+ } catch (IOException e) {}
+
+ conn.tableOperations().offline(table, true);
+ splits = inputFormat.getSplits(job);
+ assertEquals(actualSplits.size(), splits.size());
+
+ // auto adjust ranges
+ ranges = new ArrayList<Range>();
+ for (int i = 0; i < 5; i++)
+ // overlapping ranges
+ ranges.add(new Range(String.format("%09d", i), String.format("%09d", i + 2)));
+ AccumuloInputFormat.setRanges(job, ranges);
+ splits = inputFormat.getSplits(job);
+ assertEquals(2, splits.size());
+
+ AccumuloInputFormat.setAutoAdjustRanges(job, false);
+ splits = inputFormat.getSplits(job);
+ assertEquals(ranges.size(), splits.size());
+
+ // BatchScan not available for offline scans
+ AccumuloInputFormat.setBatchScan(job, true);
+ // Reset auto-adjust ranges too
+ AccumuloInputFormat.setAutoAdjustRanges(job, true);
+
+ AccumuloInputFormat.setOfflineTableScan(job, true);
+ try {
+ inputFormat.getSplits(job);
+ fail("An exception should have been thrown");
+ } catch (IllegalArgumentException e) {}
+
+ conn.tableOperations().online(table, true);
+ AccumuloInputFormat.setOfflineTableScan(job, false);
+
+ // test for resumption of success
+ splits = inputFormat.getSplits(job);
+ assertEquals(2, splits.size());
+
+ // BatchScan not available with isolated iterators
+ AccumuloInputFormat.setScanIsolation(job, true);
+ try {
+ inputFormat.getSplits(job);
+ fail("An exception should have been thrown");
+ } catch (IllegalArgumentException e) {}
+ AccumuloInputFormat.setScanIsolation(job, false);
+
+ // test for resumption of success
+ splits = inputFormat.getSplits(job);
+ assertEquals(2, splits.size());
+
+ // BatchScan not available with local iterators
+ AccumuloInputFormat.setLocalIterators(job, true);
+ try {
+ inputFormat.getSplits(job);
+ fail("An exception should have been thrown");
+ } catch (IllegalArgumentException e) {}
+ AccumuloInputFormat.setLocalIterators(job, false);
+
+ // Check we are getting back correct type pf split
+ conn.tableOperations().online(table);
+ splits = inputFormat.getSplits(job);
+ for (InputSplit split : splits)
+ assert (split instanceof BatchInputSplit);
+
+ // We should divide along the tablet lines similar to when using `setAutoAdjustRanges(job, true)`
+ assertEquals(2, splits.size());
+ }
+
+ private void insertData(String tableName, long ts) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ BatchWriter bw = getConnector().createBatchWriter(tableName, null);
+
+ for (int i = 0; i < 10000; i++) {
+ String row = String.format("%09d", i);
+
+ Mutation m = new Mutation(new Text(row));
+ m.put(new Text("cf1"), new Text("cq1"), ts, new Value(("" + i).getBytes()));
+ bw.addMutation(m);
+ }
+ bw.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java
new file mode 100644
index 0000000..4b4aeac
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/AddSplitIT.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class AddSplitIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Test
+ public void addSplitTest() throws Exception {
+
+ String tableName = getUniqueNames(1)[0];
+ Connector c = getConnector();
+ c.tableOperations().create(tableName);
+
+ insertData(tableName, 1l);
+
+ TreeSet<Text> splits = new TreeSet<Text>();
+ splits.add(new Text(String.format("%09d", 333)));
+ splits.add(new Text(String.format("%09d", 666)));
+
+ c.tableOperations().addSplits(tableName, splits);
+
+ UtilWaitThread.sleep(100);
+
+ Collection<Text> actualSplits = c.tableOperations().listSplits(tableName);
+
+ if (!splits.equals(new TreeSet<Text>(actualSplits))) {
+ throw new Exception(splits + " != " + actualSplits);
+ }
+
+ verifyData(tableName, 1l);
+ insertData(tableName, 2l);
+
+ // did not clear splits on purpose, it should ignore existing split points
+ // and still create the three additional split points
+
+ splits.add(new Text(String.format("%09d", 200)));
+ splits.add(new Text(String.format("%09d", 500)));
+ splits.add(new Text(String.format("%09d", 800)));
+
+ c.tableOperations().addSplits(tableName, splits);
+
+ UtilWaitThread.sleep(100);
+
+ actualSplits = c.tableOperations().listSplits(tableName);
+
+ if (!splits.equals(new TreeSet<Text>(actualSplits))) {
+ throw new Exception(splits + " != " + actualSplits);
+ }
+
+ verifyData(tableName, 2l);
+ }
+
+ private void verifyData(String tableName, long ts) throws Exception {
+ Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY);
+
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+
+ for (int i = 0; i < 10000; i++) {
+ if (!iter.hasNext()) {
+ throw new Exception("row " + i + " not found");
+ }
+
+ Entry<Key,Value> entry = iter.next();
+
+ String row = String.format("%09d", i);
+
+ if (!entry.getKey().getRow().equals(new Text(row))) {
+ throw new Exception("unexpected row " + entry.getKey() + " " + i);
+ }
+
+ if (entry.getKey().getTimestamp() != ts) {
+ throw new Exception("unexpected ts " + entry.getKey() + " " + ts);
+ }
+
+ if (Integer.parseInt(entry.getValue().toString()) != i) {
+ throw new Exception("unexpected value " + entry + " " + i);
+ }
+ }
+
+ if (iter.hasNext()) {
+ throw new Exception("found more than expected " + iter.next());
+ }
+
+ }
+
+ private void insertData(String tableName, long ts) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException {
+ BatchWriter bw = getConnector().createBatchWriter(tableName, null);
+
+ for (int i = 0; i < 10000; i++) {
+ String row = String.format("%09d", i);
+
+ Mutation m = new Mutation(new Text(row));
+ m.put(new Text("cf1"), new Text("cq1"), ts, new Value(Integer.toString(i).getBytes(UTF_8)));
+ bw.addMutation(m);
+ }
+
+ bw.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BackupMasterIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BackupMasterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BackupMasterIT.java
new file mode 100644
index 0000000..d8979db
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BackupMasterIT.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.master.Master;
+import org.junit.Test;
+
+public class BackupMasterIT extends ConfigurableMacBase {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 120;
+ }
+
+ @Test
+ public void test() throws Exception {
+ // wait for master
+ UtilWaitThread.sleep(1000);
+ // create a backup
+ Process backup = exec(Master.class);
+ try {
+ ZooReaderWriter writer = new ZooReaderWriter(cluster.getZooKeepers(), 30 * 1000, "digest", "accumulo:DONTTELL".getBytes());
+ String root = "/accumulo/" + getConnector().getInstance().getInstanceID();
+ List<String> children = Collections.emptyList();
+ // wait for 2 lock entries
+ do {
+ UtilWaitThread.sleep(100);
+ children = writer.getChildren(root + "/masters/lock");
+ } while (children.size() != 2);
+ Collections.sort(children);
+ // wait for the backup master to learn to be the backup
+ UtilWaitThread.sleep(1000);
+ // generate a false zookeeper event
+ String lockPath = root + "/masters/lock/" + children.get(0);
+ byte data[] = writer.getData(lockPath, null);
+ writer.getZooKeeper().setData(lockPath, data, -1);
+ // let it propagate
+ UtilWaitThread.sleep(500);
+ // kill the master by removing its lock
+ writer.recursiveDelete(lockPath, NodeMissingPolicy.FAIL);
+ // ensure the backup becomes the master
+ getConnector().tableOperations().create(getUniqueNames(1)[0]);
+ } finally {
+ backup.destroy();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java
new file mode 100644
index 0000000..4c6fc00
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+import java.util.EnumSet;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class BadIteratorMincIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Test
+ public void test() throws Exception {
+ Connector c = getConnector();
+
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ IteratorSetting is = new IteratorSetting(30, BadIterator.class);
+ c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.minc));
+ BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+
+ Mutation m = new Mutation(new Text("r1"));
+ m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(UTF_8)));
+
+ bw.addMutation(m);
+ bw.close();
+
+ c.tableOperations().flush(tableName, null, null, false);
+ UtilWaitThread.sleep(1000);
+
+ // minc should fail, so there should be no files
+ FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 0, 0);
+
+ // try to scan table
+ Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
+ int count = Iterators.size(scanner.iterator());
+ assertEquals("Did not see expected # entries " + count, 1, count);
+
+ // remove the bad iterator
+ c.tableOperations().removeIterator(tableName, BadIterator.class.getSimpleName(), EnumSet.of(IteratorScope.minc));
+
+ UtilWaitThread.sleep(5000);
+
+ // minc should complete
+ FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 1, 1);
+
+ count = Iterators.size(scanner.iterator());
+
+ if (count != 1)
+ throw new Exception("Did not see expected # entries " + count);
+
+ // now try putting bad iterator back and deleting the table
+ c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.minc));
+ bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+ m = new Mutation(new Text("r2"));
+ m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(UTF_8)));
+ bw.addMutation(m);
+ bw.close();
+
+ // make sure property is given time to propagate
+ UtilWaitThread.sleep(500);
+
+ c.tableOperations().flush(tableName, null, null, false);
+
+ // make sure the flush has time to start
+ UtilWaitThread.sleep(1000);
+
+ // this should not hang
+ c.tableOperations().delete(tableName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
new file mode 100644
index 0000000..ae470f6
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BalanceAfterCommsFailureIT.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+public class BalanceAfterCommsFailureIT extends ConfigurableMacBase {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.GENERAL_RPC_TIMEOUT, "2s");
+ }
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ @Test
+ public void test() throws Exception {
+ Connector c = this.getConnector();
+ c.tableOperations().create("test");
+ Collection<ProcessReference> tservers = getCluster().getProcesses().get(ServerType.TABLET_SERVER);
+ ArrayList<Integer> tserverPids = new ArrayList<Integer>(tservers.size());
+ for (ProcessReference tserver : tservers) {
+ Process p = tserver.getProcess();
+ if (!p.getClass().getName().equals("java.lang.UNIXProcess")) {
+ log.info("Found process that was not UNIXProcess, exiting test");
+ return;
+ }
+
+ Field f = p.getClass().getDeclaredField("pid");
+ f.setAccessible(true);
+ tserverPids.add(f.getInt(p));
+ }
+
+ for (int pid : tserverPids) {
+ assertEquals(0, Runtime.getRuntime().exec(new String[] {"kill", "-SIGSTOP", Integer.toString(pid)}).waitFor());
+ }
+ UtilWaitThread.sleep(20 * 1000);
+ for (int pid : tserverPids) {
+ assertEquals(0, Runtime.getRuntime().exec(new String[] {"kill", "-SIGCONT", Integer.toString(pid)}).waitFor());
+ }
+ SortedSet<Text> splits = new TreeSet<Text>();
+ for (String split : "a b c d e f g h i j k l m n o p q r s t u v w x y z".split(" ")) {
+ splits.add(new Text(split));
+ }
+ c.tableOperations().addSplits("test", splits);
+ // Ensure all of the tablets are actually assigned
+ assertEquals(0, Iterables.size(c.createScanner("test", Authorizations.EMPTY)));
+ UtilWaitThread.sleep(30 * 1000);
+ checkBalance(c);
+ }
+
+ private void checkBalance(Connector c) throws Exception {
+ Credentials creds = new Credentials("root", new PasswordToken(ROOT_PASSWORD));
+ ClientContext context = new ClientContext(c.getInstance(), creds, getClientConfig());
+
+ MasterMonitorInfo stats = null;
+ int unassignedTablets = 1;
+ for (int i = 0; unassignedTablets > 0 && i < 10; i++) {
+ MasterClientService.Iface client = null;
+ try {
+ client = MasterClient.getConnectionWithRetry(context);
+ stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
+ } finally {
+ if (client != null)
+ MasterClient.close(client);
+ }
+ unassignedTablets = stats.getUnassignedTablets();
+ if (unassignedTablets > 0) {
+ log.info("Found " + unassignedTablets + " unassigned tablets, sleeping 3 seconds for tablet assignment");
+ Thread.sleep(3000);
+ }
+ }
+
+ assertEquals("Unassigned tablets were not assigned within 30 seconds", 0, unassignedTablets);
+
+ List<Integer> counts = new ArrayList<Integer>();
+ for (TabletServerStatus server : stats.tServerInfo) {
+ int count = 0;
+ for (TableInfo table : server.tableMap.values()) {
+ count += table.onlineTablets;
+ }
+ counts.add(count);
+ }
+ assertTrue("Expected to have at least two TabletServers", counts.size() > 1);
+ for (int i = 1; i < counts.size(); i++) {
+ int diff = Math.abs(counts.get(0) - counts.get(i));
+ assertTrue("Expected difference in tablets to be less than or equal to " + counts.size() + " but was " + diff + ". Counts " + counts,
+ diff <= counts.size());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
new file mode 100644
index 0000000..623d79b
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Start a new table, create many splits, and offline before they can rebalance. Then try to have a different table balance
+ */
+public class BalanceInPresenceOfOfflineTableIT extends AccumuloClusterHarness {
+
+ private static Logger log = LoggerFactory.getLogger(BalanceInPresenceOfOfflineTableIT.class);
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ Map<String,String> siteConfig = cfg.getSiteConfig();
+ siteConfig.put(Property.TSERV_MAXMEM.getKey(), "10K");
+ siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "0");
+ cfg.setSiteConfig(siteConfig);
+ // ensure we have two tservers
+ if (cfg.getNumTservers() < 2) {
+ cfg.setNumTservers(2);
+ }
+ }
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 10 * 60;
+ }
+
+ private static final int NUM_SPLITS = 200;
+
+ private String UNUSED_TABLE, TEST_TABLE;
+
+ private Connector connector;
+
+ @Before
+ public void setupTables() throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException {
+ Connector conn = getConnector();
+ // Need at least two tservers
+ Assume.assumeTrue("Not enough tservers to run test", conn.instanceOperations().getTabletServers().size() >= 2);
+
+ // set up splits
+ final SortedSet<Text> splits = new TreeSet<Text>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ splits.add(new Text(String.format("%08x", i * 1000)));
+ }
+
+ String[] names = getUniqueNames(2);
+ UNUSED_TABLE = names[0];
+ TEST_TABLE = names[1];
+
+ // load into a table we won't use
+ connector = getConnector();
+ connector.tableOperations().create(UNUSED_TABLE);
+ connector.tableOperations().addSplits(UNUSED_TABLE, splits);
+ // mark the table offline before it can rebalance.
+ connector.tableOperations().offline(UNUSED_TABLE);
+
+ // actual test table
+ connector.tableOperations().create(TEST_TABLE);
+ connector.tableOperations().setProperty(TEST_TABLE, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
+ }
+
+ @Test
+ public void test() throws Exception {
+ log.info("Test that balancing is not stopped by an offline table with outstanding migrations.");
+
+ log.debug("starting test ingestion");
+
+ TestIngest.Opts opts = new TestIngest.Opts();
+ VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+ ClientConfiguration conf = cluster.getClientConfig();
+ if (conf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ opts.updateKerberosCredentials(cluster.getClientConfig());
+ vopts.updateKerberosCredentials(cluster.getClientConfig());
+ } else {
+ opts.setPrincipal("root");
+ vopts.setPrincipal("root");
+ }
+ vopts.rows = opts.rows = 200000;
+ opts.setTableName(TEST_TABLE);
+ TestIngest.ingest(connector, opts, new BatchWriterOpts());
+ connector.tableOperations().flush(TEST_TABLE, null, null, true);
+ vopts.setTableName(TEST_TABLE);
+ VerifyIngest.verifyIngest(connector, vopts, new ScannerOpts());
+
+ log.debug("waiting for balancing, up to ~5 minutes to allow for migration cleanup.");
+ final long startTime = System.currentTimeMillis();
+ long currentWait = 10 * 1000;
+ boolean balancingWorked = false;
+
+ Credentials creds = new Credentials(getAdminPrincipal(), getAdminToken());
+ while (!balancingWorked && (System.currentTimeMillis() - startTime) < ((5 * 60 + 15) * 1000)) {
+ Thread.sleep(currentWait);
+ currentWait *= 2;
+
+ log.debug("fetch the list of tablets assigned to each tserver.");
+
+ MasterClientService.Iface client = null;
+ MasterMonitorInfo stats = null;
+ try {
+ Instance instance = new ZooKeeperInstance(cluster.getClientConfig());
+ client = MasterClient.getConnectionWithRetry(new ClientContext(instance, creds, cluster.getClientConfig()));
+ stats = client.getMasterStats(Tracer.traceInfo(), creds.toThrift(instance));
+ } catch (ThriftSecurityException exception) {
+ throw new AccumuloSecurityException(exception);
+ } catch (TException exception) {
+ throw new AccumuloException(exception);
+ } finally {
+ if (client != null) {
+ MasterClient.close(client);
+ }
+ }
+
+ if (stats.getTServerInfoSize() < 2) {
+ log.debug("we need >= 2 servers. sleeping for " + currentWait + "ms");
+ continue;
+ }
+ if (stats.getUnassignedTablets() != 0) {
+ log.debug("We shouldn't have unassigned tablets. sleeping for " + currentWait + "ms");
+ continue;
+ }
+
+ long[] tabletsPerServer = new long[stats.getTServerInfoSize()];
+ Arrays.fill(tabletsPerServer, 0l);
+ for (int i = 0; i < stats.getTServerInfoSize(); i++) {
+ for (Map.Entry<String,TableInfo> entry : stats.getTServerInfo().get(i).getTableMap().entrySet()) {
+ tabletsPerServer[i] += entry.getValue().getTablets();
+ }
+ }
+
+ if (tabletsPerServer[0] <= 10) {
+ log.debug("We should have > 10 tablets. sleeping for " + currentWait + "ms");
+ continue;
+ }
+ long min = NumberUtils.min(tabletsPerServer), max = NumberUtils.max(tabletsPerServer);
+ log.debug("Min=" + min + ", Max=" + max);
+ if ((min / ((double) max)) < 0.5) {
+ log.debug("ratio of min to max tablets per server should be roughly even. sleeping for " + currentWait + "ms");
+ continue;
+ }
+ balancingWorked = true;
+ }
+
+ Assert.assertTrue("did not properly balance", balancingWorked);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
new file mode 100644
index 0000000..14295c4
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BatchScanSplitIT extends AccumuloClusterHarness {
+ private static final Logger log = LoggerFactory.getLogger(BatchScanSplitIT.class);
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.TSERV_MAJC_DELAY, "0");
+ }
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ @Test
+ public void test() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+
+ int numRows = 1 << 18;
+
+ BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig());
+
+ for (int i = 0; i < numRows; i++) {
+ Mutation m = new Mutation(new Text(String.format("%09x", i)));
+ m.put(new Text("cf1"), new Text("cq1"), new Value(String.format("%016x", numRows - i).getBytes(UTF_8)));
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ getConnector().tableOperations().flush(tableName, null, null, true);
+
+ getConnector().tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "4K");
+
+ Collection<Text> splits = getConnector().tableOperations().listSplits(tableName);
+ while (splits.size() < 2) {
+ UtilWaitThread.sleep(1);
+ splits = getConnector().tableOperations().listSplits(tableName);
+ }
+
+ System.out.println("splits : " + splits);
+
+ Random random = new Random(19011230);
+ HashMap<Text,Value> expected = new HashMap<Text,Value>();
+ ArrayList<Range> ranges = new ArrayList<Range>();
+ for (int i = 0; i < 100; i++) {
+ int r = random.nextInt(numRows);
+ Text row = new Text(String.format("%09x", r));
+ expected.put(row, new Value(String.format("%016x", numRows - r).getBytes(UTF_8)));
+ ranges.add(new Range(row));
+ }
+
+ // logger.setLevel(Level.TRACE);
+
+ HashMap<Text,Value> found = new HashMap<Text,Value>();
+
+ for (int i = 0; i < 20; i++) {
+ BatchScanner bs = getConnector().createBatchScanner(tableName, Authorizations.EMPTY, 4);
+
+ found.clear();
+
+ long t1 = System.currentTimeMillis();
+
+ bs.setRanges(ranges);
+
+ for (Entry<Key,Value> entry : bs) {
+ found.put(entry.getKey().getRow(), entry.getValue());
+ }
+ bs.close();
+
+ long t2 = System.currentTimeMillis();
+
+ log.info(String.format("rate : %06.2f%n", ranges.size() / ((t2 - t1) / 1000.0)));
+
+ if (!found.equals(expected))
+ throw new Exception("Found and expected differ " + found + " " + expected);
+ }
+
+ splits = getConnector().tableOperations().listSplits(tableName);
+ log.info("splits : " + splits);
+ }
+
+}