You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/06/04 20:53:21 UTC
[40/43] accumulo git commit: ACCUMULO-3871 move ITs into distro jar,
stop building test jar
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/FileArchiveIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/FileArchiveIT.java b/test/src/main/java/org/apache/accumulo/test/FileArchiveIT.java
new file mode 100644
index 0000000..8e51984
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/FileArchiveIT.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Tests that files are archived instead of deleted when configured.
+ */
+public class FileArchiveIT extends ConfigurableMacBase {
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+ cfg.setProperty(Property.GC_FILE_ARCHIVE, "true");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+ cfg.setProperty(Property.GC_CYCLE_START, "1s");
+ }
+
+ @Test
+ public void testUnusuedFilesAreArchived() throws Exception {
+ final Connector conn = getConnector();
+ final String tableName = getUniqueNames(1)[0];
+
+ conn.tableOperations().create(tableName);
+
+ final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+ Assert.assertNotNull("Could not get table ID", tableId);
+
+ BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("row");
+ m.put("", "", "value");
+ bw.addMutation(m);
+ bw.close();
+
+ // Compact memory to disk
+ conn.tableOperations().compact(tableName, null, null, true, true);
+
+ Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+
+ Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+ final String file = entry.getKey().getColumnQualifier().toString();
+ final Path p = new Path(file);
+
+ // Then force another to make an unreferenced file
+ conn.tableOperations().compact(tableName, null, null, true, true);
+
+ log.info("File for table: " + file);
+
+ FileSystem fs = getCluster().getFileSystem();
+ int i = 0;
+ while (fs.exists(p)) {
+ i++;
+ Thread.sleep(1000);
+ if (0 == i % 10) {
+ log.info("Waited " + i + " iterations, file still exists");
+ }
+ }
+
+ log.info("File was removed");
+
+ String filePath = p.toUri().getPath().substring(getCluster().getConfig().getAccumuloDir().toString().length());
+
+ log.info("File relative to accumulo dir: " + filePath);
+
+ Path fileArchiveDir = new Path(getCluster().getConfig().getAccumuloDir().toString(), ServerConstants.FILE_ARCHIVE_DIR);
+
+ Assert.assertTrue("File archive directory didn't exist", fs.exists(fileArchiveDir));
+
+ // Remove the leading '/' to make sure Path treats the 2nd arg as a child.
+ Path archivedFile = new Path(fileArchiveDir, filePath.substring(1));
+
+ Assert.assertTrue("File doesn't exists in archive directory: " + archivedFile, fs.exists(archivedFile));
+ }
+
+ @Test
+ public void testDeletedTableIsArchived() throws Exception {
+ final Connector conn = getConnector();
+ final String tableName = getUniqueNames(1)[0];
+
+ conn.tableOperations().create(tableName);
+
+ final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+ Assert.assertNotNull("Could not get table ID", tableId);
+
+ BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("row");
+ m.put("", "", "value");
+ bw.addMutation(m);
+ bw.close();
+
+ // Compact memory to disk
+ conn.tableOperations().compact(tableName, null, null, true, true);
+
+ Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+
+ Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+ final String file = entry.getKey().getColumnQualifier().toString();
+ final Path p = new Path(file);
+
+ conn.tableOperations().delete(tableName);
+
+ log.info("File for table: " + file);
+
+ FileSystem fs = getCluster().getFileSystem();
+ int i = 0;
+ while (fs.exists(p)) {
+ i++;
+ Thread.sleep(1000);
+ if (0 == i % 10) {
+ log.info("Waited " + i + " iterations, file still exists");
+ }
+ }
+
+ log.info("File was removed");
+
+ String filePath = p.toUri().getPath().substring(getCluster().getConfig().getAccumuloDir().toString().length());
+
+ log.info("File relative to accumulo dir: " + filePath);
+
+ Path fileArchiveDir = new Path(getCluster().getConfig().getAccumuloDir().toString(), ServerConstants.FILE_ARCHIVE_DIR);
+
+ Assert.assertTrue("File archive directory didn't exist", fs.exists(fileArchiveDir));
+
+ // Remove the leading '/' to make sure Path treats the 2nd arg as a child.
+ Path archivedFile = new Path(fileArchiveDir, filePath.substring(1));
+
+ Assert.assertTrue("File doesn't exists in archive directory: " + archivedFile, fs.exists(archivedFile));
+ }
+
+ @Test
+ public void testUnusuedFilesAndDeletedTable() throws Exception {
+ final Connector conn = getConnector();
+ final String tableName = getUniqueNames(1)[0];
+
+ conn.tableOperations().create(tableName);
+
+ final String tableId = conn.tableOperations().tableIdMap().get(tableName);
+ Assert.assertNotNull("Could not get table ID", tableId);
+
+ BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("row");
+ m.put("", "", "value");
+ bw.addMutation(m);
+ bw.close();
+
+ // Compact memory to disk
+ conn.tableOperations().compact(tableName, null, null, true, true);
+
+ Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+
+ Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+ final String file = entry.getKey().getColumnQualifier().toString();
+ final Path p = new Path(file);
+
+ // Then force another to make an unreferenced file
+ conn.tableOperations().compact(tableName, null, null, true, true);
+
+ log.info("File for table: " + file);
+
+ FileSystem fs = getCluster().getFileSystem();
+ int i = 0;
+ while (fs.exists(p)) {
+ i++;
+ Thread.sleep(1000);
+ if (0 == i % 10) {
+ log.info("Waited " + i + " iterations, file still exists");
+ }
+ }
+
+ log.info("File was removed");
+
+ String filePath = p.toUri().getPath().substring(getCluster().getConfig().getAccumuloDir().toString().length());
+
+ log.info("File relative to accumulo dir: " + filePath);
+
+ Path fileArchiveDir = new Path(getCluster().getConfig().getAccumuloDir().toString(), ServerConstants.FILE_ARCHIVE_DIR);
+
+ Assert.assertTrue("File archive directory didn't exist", fs.exists(fileArchiveDir));
+
+ // Remove the leading '/' to make sure Path treats the 2nd arg as a child.
+ Path archivedFile = new Path(fileArchiveDir, filePath.substring(1));
+
+ Assert.assertTrue("File doesn't exists in archive directory: " + archivedFile, fs.exists(archivedFile));
+
+ // Offline the table so we can be sure there is a single file
+ conn.tableOperations().offline(tableName, true);
+
+ // See that the file in metadata currently is
+ s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+
+ entry = Iterables.getOnlyElement(s);
+ final String finalFile = entry.getKey().getColumnQualifier().toString();
+ final Path finalPath = new Path(finalFile);
+
+ conn.tableOperations().delete(tableName);
+
+ log.info("File for table: " + finalPath);
+
+ i = 0;
+ while (fs.exists(finalPath)) {
+ i++;
+ Thread.sleep(1000);
+ if (0 == i % 10) {
+ log.info("Waited " + i + " iterations, file still exists");
+ }
+ }
+
+ log.info("File was removed");
+
+ String finalFilePath = finalPath.toUri().getPath().substring(getCluster().getConfig().getAccumuloDir().toString().length());
+
+ log.info("File relative to accumulo dir: " + finalFilePath);
+
+ Assert.assertTrue("File archive directory didn't exist", fs.exists(fileArchiveDir));
+
+ // Remove the leading '/' to make sure Path treats the 2nd arg as a child.
+ Path finalArchivedFile = new Path(fileArchiveDir, finalFilePath.substring(1));
+
+ Assert.assertTrue("File doesn't exists in archive directory: " + finalArchivedFile, fs.exists(finalArchivedFile));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/GarbageCollectWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/GarbageCollectWALIT.java b/test/src/main/java/org/apache/accumulo/test/GarbageCollectWALIT.java
new file mode 100644
index 0000000..141ee27
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/GarbageCollectWALIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class GarbageCollectWALIT extends ConfigurableMacBase {
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.INSTANCE_ZK_HOST, "5s");
+ cfg.setProperty(Property.GC_CYCLE_START, "1s");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+ cfg.setNumTservers(1);
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void test() throws Exception {
+ // not yet, please
+ String tableName = getUniqueNames(1)[0];
+ cluster.getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
+ Connector c = getConnector();
+ c.tableOperations().create(tableName);
+ // count the number of WALs in the filesystem
+ assertEquals(2, countWALsInFS(cluster));
+ cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
+ cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR);
+ cluster.getClusterControl().start(ServerType.TABLET_SERVER);
+ Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+ // let GC run
+ UtilWaitThread.sleep(3 * 5 * 1000);
+ assertEquals(2, countWALsInFS(cluster));
+ }
+
+ private int countWALsInFS(MiniAccumuloClusterImpl cluster) throws Exception {
+ FileSystem fs = cluster.getFileSystem();
+ RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(cluster.getConfig().getAccumuloDir() + "/wal"), true);
+ int result = 0;
+ while (iterator.hasNext()) {
+ LocatedFileStatus next = iterator.next();
+ if (!next.isDirectory()) {
+ result++;
+ }
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java b/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java
new file mode 100644
index 0000000..55d83f5
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ImportTable didn't correctly place absolute paths in metadata. This resulted in the imported table only being usable when the actual HDFS directory for
+ * Accumulo was the same as Property.INSTANCE_DFS_DIR. If any other HDFS directory was used, any interactions with the table would fail because the relative
+ * path in the metadata table (created by the ImportTable process) would be converted to a non-existent absolute path.
+ * <p>
+ * ACCUMULO-3215
+ *
+ */
+public class ImportExportIT extends AccumuloClusterHarness {
+
+ private static final Logger log = LoggerFactory.getLogger(ImportExportIT.class);
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Test
+ public void testExportImportThenScan() throws Exception {
+ Connector conn = getConnector();
+
+ String[] tableNames = getUniqueNames(2);
+ String srcTable = tableNames[0], destTable = tableNames[1];
+ conn.tableOperations().create(srcTable);
+
+ BatchWriter bw = conn.createBatchWriter(srcTable, new BatchWriterConfig());
+ for (int row = 0; row < 1000; row++) {
+ Mutation m = new Mutation(Integer.toString(row));
+ for (int col = 0; col < 100; col++) {
+ m.put(Integer.toString(col), "", Integer.toString(col * 2));
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ conn.tableOperations().compact(srcTable, null, null, true, true);
+
+ // Make a directory we can use to throw the export and import directories
+ // Must exist on the filesystem the cluster is running.
+ FileSystem fs = cluster.getFileSystem();
+ Path tmp = cluster.getTemporaryPath();
+ log.info("Using FileSystem: " + fs);
+ Path baseDir = new Path(tmp, getClass().getName());
+ if (fs.exists(baseDir)) {
+ log.info("{} exists on filesystem, deleting", baseDir);
+ assertTrue("Failed to deleted " + baseDir, fs.delete(baseDir, true));
+ }
+ log.info("Creating {}", baseDir);
+ assertTrue("Failed to create " + baseDir, fs.mkdirs(baseDir));
+ Path exportDir = new Path(baseDir, "export");
+ Path importDir = new Path(baseDir, "import");
+ for (Path p : new Path[] {exportDir, importDir}) {
+ assertTrue("Failed to create " + baseDir, fs.mkdirs(p));
+ }
+
+ FsShell fsShell = new FsShell(fs.getConf());
+ assertEquals("Failed to chmod " + baseDir, 0, fsShell.run(new String[] {"-chmod", "-R", "777", baseDir.toString()}));
+
+ log.info("Exporting table to {}", exportDir);
+ log.info("Importing table from {}", importDir);
+
+ // Offline the table
+ conn.tableOperations().offline(srcTable, true);
+ // Then export it
+ conn.tableOperations().exportTable(srcTable, exportDir.toString());
+
+ // Make sure the distcp.txt file that exporttable creates is available
+ Path distcp = new Path(exportDir, "distcp.txt");
+ Assert.assertTrue("Distcp file doesn't exist", fs.exists(distcp));
+ FSDataInputStream is = fs.open(distcp);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+
+ // Copy each file that was exported to the import directory
+ String line;
+ while (null != (line = reader.readLine())) {
+ Path p = new Path(line.substring(5));
+ Assert.assertTrue("File doesn't exist: " + p, fs.exists(p));
+
+ Path dest = new Path(importDir, p.getName());
+ Assert.assertFalse("Did not expect " + dest + " to exist", fs.exists(dest));
+ FileUtil.copy(fs, p, fs, dest, false, fs.getConf());
+ }
+
+ reader.close();
+
+ log.info("Import dir: {}", Arrays.toString(fs.listStatus(importDir)));
+
+ // Import the exported data into a new table
+ conn.tableOperations().importTable(destTable, importDir.toString());
+
+ // Get the table ID for the table that the importtable command created
+ final String tableId = conn.tableOperations().tableIdMap().get(destTable);
+ Assert.assertNotNull(tableId);
+
+ // Get all `file` colfams from the metadata table for the new table
+ log.info("Imported into table with ID: {}", tableId);
+ Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(s);
+
+ // Should find a single entry
+ for (Entry<Key,Value> fileEntry : s) {
+ Key k = fileEntry.getKey();
+ String value = fileEntry.getValue().toString();
+ if (k.getColumnFamily().equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)) {
+ // The file should be an absolute URI (file:///...), not a relative path (/b-000.../I000001.rf)
+ String fileUri = k.getColumnQualifier().toString();
+ Assert.assertFalse("Imported files should have absolute URIs, not relative: " + fileUri, looksLikeRelativePath(fileUri));
+ } else if (k.getColumnFamily().equals(MetadataSchema.TabletsSection.ServerColumnFamily.NAME)) {
+ Assert.assertFalse("Server directory should have absolute URI, not relative: " + value, looksLikeRelativePath(value));
+ } else {
+ Assert.fail("Got expected pair: " + k + "=" + fileEntry.getValue());
+ }
+ }
+
+ // Online the original table before we verify equivalence
+ conn.tableOperations().online(srcTable, true);
+
+ verifyTableEquality(conn, srcTable, destTable);
+ }
+
+ private void verifyTableEquality(Connector conn, String srcTable, String destTable) throws Exception {
+ Iterator<Entry<Key,Value>> src = conn.createScanner(srcTable, Authorizations.EMPTY).iterator(), dest = conn.createScanner(destTable, Authorizations.EMPTY)
+ .iterator();
+ Assert.assertTrue("Could not read any data from source table", src.hasNext());
+ Assert.assertTrue("Could not read any data from destination table", dest.hasNext());
+ while (src.hasNext() && dest.hasNext()) {
+ Entry<Key,Value> orig = src.next(), copy = dest.next();
+ Assert.assertEquals(orig.getKey(), copy.getKey());
+ Assert.assertEquals(orig.getValue(), copy.getValue());
+ }
+ Assert.assertFalse("Source table had more data to read", src.hasNext());
+ Assert.assertFalse("Dest table had more data to read", dest.hasNext());
+ }
+
+ private boolean looksLikeRelativePath(String uri) {
+ if (uri.startsWith("/" + Constants.BULK_PREFIX)) {
+ if ('/' == uri.charAt(10)) {
+ return true;
+ }
+ } else if (uri.startsWith("/" + Constants.CLONE_PREFIX)) {
+ return true;
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/IntegrationTestMapReduce.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/IntegrationTestMapReduce.java b/test/src/main/java/org/apache/accumulo/test/IntegrationTestMapReduce.java
new file mode 100644
index 0000000..e33f3a9
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/IntegrationTestMapReduce.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.runner.Description;
+import org.junit.runner.JUnitCore;
+import org.junit.runner.Result;
+import org.junit.runner.notification.Failure;
+import org.junit.runner.notification.RunListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IntegrationTestMapReduce extends Configured implements Tool {
+
+ private static final Logger log = LoggerFactory.getLogger(IntegrationTestMapReduce.class);
+
+ public static class TestMapper extends Mapper<LongWritable,Text,IntWritable,Text> {
+
+ @Override
+ protected void map(LongWritable key, Text value, final Mapper<LongWritable,Text,IntWritable,Text>.Context context) throws IOException, InterruptedException {
+ String className = value.toString();
+ if (className.trim().isEmpty()) {
+ return;
+ }
+ Class<? extends Object> test = null;
+ try {
+ test = Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ log.debug("Error finding class {}", className, e);
+ context.write(new IntWritable(-1), new Text(e.toString()));
+ return;
+ }
+ JUnitCore core = new JUnitCore();
+ core.addListener(new RunListener() {
+
+ @Override
+ public void testStarted(Description description) throws Exception {
+ log.info("Starting {}", description);
+ context.progress();
+ }
+
+ @Override
+ public void testFinished(Description description) throws Exception {
+ log.info("Finished {}", description);
+ context.progress();
+ }
+
+ @Override
+ public void testFailure(Failure failure) throws Exception {
+ log.info("Test failed: {}", failure.getDescription(), failure.getException());
+ context.progress();
+ }
+
+ });
+ log.info("Running test {}", className);
+ try {
+ Result result = core.run(test);
+ if (result.wasSuccessful()) {
+ log.info("{} was successful", className);
+ context.write(new IntWritable(0), value);
+ } else {
+ log.info("{} failed", className);
+ context.write(new IntWritable(1), value);
+ }
+ } catch (Exception e) {
+ // most likely JUnit issues, like no tests to run
+ log.info("Test failed: {}", className, e);
+ }
+ }
+ }
+
+ public static class TestReducer extends Reducer<IntWritable,Text,IntWritable,Text> {
+
+ @Override
+ protected void reduce(IntWritable code, Iterable<Text> tests, Reducer<IntWritable,Text,IntWritable,Text>.Context context) throws IOException,
+ InterruptedException {
+ StringBuffer result = new StringBuffer();
+ for (Text test : tests) {
+ result.append(test);
+ result.append("\n");
+ }
+ context.write(code, new Text(result.toString()));
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ // read a list of tests from the input, and print out the results
+ if (args.length != 2) {
+ System.err.println("Wrong number of args: <input> <output>");
+ }
+ Configuration conf = getConf();
+ Job job = Job.getInstance(conf, "accumulo integration test runner");
+ // read one line at a time
+ job.setInputFormatClass(NLineInputFormat.class);
+ conf.setInt(NLineInputFormat.LINES_PER_MAP, 1);
+
+ // run the test
+ job.setJarByClass(IntegrationTestMapReduce.class);
+ job.setMapperClass(TestMapper.class);
+
+ // group test by result code
+ job.setReducerClass(TestReducer.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(Text.class);
+
+ FileInputFormat.addInputPath(job, new Path(args[0]));
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new IntegrationTestMapReduce(), args));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java b/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java
new file mode 100644
index 0000000..a272bc2
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.ActiveScan;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+// ACCUMULO-3030
+public class InterruptibleScannersIT extends AccumuloClusterHarness {
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
+ }
+
+ @Test
+ public void test() throws Exception {
+ // make a table
+ final String tableName = getUniqueNames(1)[0];
+ final Connector conn = getConnector();
+ conn.tableOperations().create(tableName);
+ // make the world's slowest scanner
+ final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
+ final IteratorSetting cfg = new IteratorSetting(100, SlowIterator.class);
+ // Wait long enough to be sure we can catch it, but not indefinitely.
+ SlowIterator.setSeekSleepTime(cfg, 60 * 1000);
+ scanner.addScanIterator(cfg);
+ // create a thread to interrupt the slow scan
+ final Thread scanThread = Thread.currentThread();
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ // ensure the scan is running: not perfect, the metadata tables could be scanned, too.
+ String tserver = conn.instanceOperations().getTabletServers().iterator().next();
+ do {
+ ArrayList<ActiveScan> scans = new ArrayList<ActiveScan>(conn.instanceOperations().getActiveScans(tserver));
+ Iterator<ActiveScan> iter = scans.iterator();
+ while (iter.hasNext()) {
+ ActiveScan scan = iter.next();
+ // Remove scans not against our table and not owned by us
+ if (!getAdminPrincipal().equals(scan.getUser()) || !tableName.equals(scan.getTable())) {
+ iter.remove();
+ }
+ }
+
+ if (!scans.isEmpty()) {
+ // We found our scan
+ break;
+ }
+ } while (true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ // BAM!
+ scanThread.interrupt();
+ }
+ };
+ thread.start();
+ try {
+ // Use the scanner, expect problems
+ Iterators.size(scanner.iterator());
+ Assert.fail("Scan should not succeed");
+ } catch (Exception ex) {} finally {
+ thread.join();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/KeyValueEqualityIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/KeyValueEqualityIT.java b/test/src/main/java/org/apache/accumulo/test/KeyValueEqualityIT.java
new file mode 100644
index 0000000..b0734b4
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/KeyValueEqualityIT.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class KeyValueEqualityIT extends AccumuloClusterHarness {
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Test
+ public void testEquality() throws Exception {
+ Connector conn = this.getConnector();
+ final BatchWriterConfig config = new BatchWriterConfig();
+
+ final String[] tables = getUniqueNames(2);
+ final String table1 = tables[0], table2 = tables[1];
+ final TableOperations tops = conn.tableOperations();
+ tops.create(table1);
+ tops.create(table2);
+
+ final BatchWriter bw1 = conn.createBatchWriter(table1, config), bw2 = conn.createBatchWriter(table2, config);
+
+ for (int row = 0; row < 100; row++) {
+ Mutation m = new Mutation(Integer.toString(row));
+ for (int col = 0; col < 10; col++) {
+ m.put(Integer.toString(col), "", System.currentTimeMillis(), Integer.toString(col * 2));
+ }
+ bw1.addMutation(m);
+ bw2.addMutation(m);
+ }
+
+ bw1.close();
+ bw2.close();
+
+ Iterator<Entry<Key,Value>> t1 = conn.createScanner(table1, Authorizations.EMPTY).iterator(), t2 = conn.createScanner(table2, Authorizations.EMPTY)
+ .iterator();
+ while (t1.hasNext() && t2.hasNext()) {
+ // KeyValue, the implementation of Entry<Key,Value>, should support equality and hashCode properly
+ Entry<Key,Value> e1 = t1.next(), e2 = t2.next();
+ Assert.assertEquals(e1, e2);
+ Assert.assertEquals(e1.hashCode(), e2.hashCode());
+ }
+ Assert.assertFalse("table1 had more data to read", t1.hasNext());
+ Assert.assertFalse("table2 had more data to read", t2.hasNext());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/LargeSplitRowIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/LargeSplitRowIT.java b/test/src/main/java/org/apache/accumulo/test/LargeSplitRowIT.java
new file mode 100644
index 0000000..479bb0e
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/LargeSplitRowIT.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.AccumuloServerException;
+import org.apache.accumulo.core.client.impl.Namespaces;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LargeSplitRowIT extends ConfigurableMacBase {
+ static private final Logger log = LoggerFactory.getLogger(LargeSplitRowIT.class);
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
+
+ Map<String,String> siteConfig = new HashMap<String,String>();
+ siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "50ms");
+ cfg.setSiteConfig(siteConfig);
+ }
+
+ // User added split
+ @Test(timeout = 60 * 1000)
+ public void userAddedSplit() throws Exception {
+
+ log.info("User added split");
+
+ // make a table and lower the TABLE_END_ROW_MAX_SIZE property
+ final String tableName = getUniqueNames(1)[0];
+ final Connector conn = getConnector();
+ conn.tableOperations().create(tableName);
+ conn.tableOperations().setProperty(tableName, Property.TABLE_MAX_END_ROW_SIZE.getKey(), "1000");
+
+ // Create a BatchWriter and add a mutation to the table
+ BatchWriter batchWriter = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("Row");
+ m.put("cf", "cq", "value");
+ batchWriter.addMutation(m);
+ batchWriter.close();
+
+ // Create a split point that is too large to be an end row and fill it with all 'm'
+ SortedSet<Text> partitionKeys = new TreeSet<Text>();
+ byte data[] = new byte[(int) (TableConfiguration.getMemoryInBytes(Property.TABLE_MAX_END_ROW_SIZE.getDefaultValue()) + 2)];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = 'm';
+ }
+ partitionKeys.add(new Text(data));
+
+ // try to add the split point that is too large, if the split point is created the test fails.
+ try {
+ conn.tableOperations().addSplits(tableName, partitionKeys);
+ Assert.fail();
+ } catch (AccumuloServerException e) {}
+
+ // Make sure that the information that was written to the table before we tried to add the split point is still correct
+ int counter = 0;
+ final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
+ for (Entry<Key,Value> entry : scanner) {
+ counter++;
+ Key k = entry.getKey();
+ Assert.assertEquals("Row", k.getRow().toString());
+ Assert.assertEquals("cf", k.getColumnFamily().toString());
+ Assert.assertEquals("cq", k.getColumnQualifier().toString());
+ Assert.assertEquals("value", entry.getValue().toString());
+
+ }
+ // Make sure there is only one line in the table
+ Assert.assertEquals(1, counter);
+ }
+
+ // Test tablet server split with 250 entries with all the same prefix
+ @Test(timeout = 60 * 1000)
+ public void automaticSplitWith250Same() throws Exception {
+ log.info("Automatic with 250 with same prefix");
+
+ // make a table and lower the configure properties
+ final String tableName = getUniqueNames(1)[0];
+ final Connector conn = getConnector();
+ conn.tableOperations().create(tableName);
+ conn.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
+ conn.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none");
+ conn.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64");
+ conn.tableOperations().setProperty(tableName, Property.TABLE_MAX_END_ROW_SIZE.getKey(), "1000");
+
+ // Create a BatchWriter and key for a table entry that is longer than the allowed size for an end row
+ // Fill this key with all m's except the last spot
+ BatchWriter batchWriter = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ byte data[] = new byte[(int) (TableConfiguration.getMemoryInBytes(Property.TABLE_MAX_END_ROW_SIZE.getDefaultValue()) + 2)];
+ for (int i = 0; i < data.length - 1; i++) {
+ data[i] = (byte) 'm';
+ }
+
+ // Make the last place in the key different for every entry added to the table
+ for (int i = 0; i < 250; i++) {
+ data[data.length - 1] = (byte) i;
+ Mutation m = new Mutation(data);
+ m.put("cf", "cq", "value");
+ batchWriter.addMutation(m);
+ }
+ // Flush the BatchWriter and table and sleep for a bit to make sure that there is enough time for the table to split if need be.
+ batchWriter.close();
+ conn.tableOperations().flush(tableName, new Text(), new Text("z"), true);
+ Thread.sleep(500);
+
+ // Make sure all the data that was put in the table is still correct
+ int count = 0;
+ final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
+ for (Entry<Key,Value> entry : scanner) {
+ Key k = entry.getKey();
+ data[data.length - 1] = (byte) count;
+ String expected = new String(data, UTF_8);
+ Assert.assertEquals(expected, k.getRow().toString());
+ Assert.assertEquals("cf", k.getColumnFamily().toString());
+ Assert.assertEquals("cq", k.getColumnQualifier().toString());
+ Assert.assertEquals("value", entry.getValue().toString());
+ count++;
+ }
+ Assert.assertEquals(250, count);
+
+ // Make sure no splits occurred in the table
+ Assert.assertEquals(0, conn.tableOperations().listSplits(tableName).size());
+ }
+
+ // 10 0's; 10 2's; 10 4's... 10 30's etc
+ @Test(timeout = 60 * 1000)
+ public void automaticSplitWithGaps() throws Exception {
+ log.info("Automatic Split With Gaps");
+
+ automaticSplit(30, 2);
+ }
+
+ // 10 0's; 10 1's; 10 2's... 10 15's etc
+ @Test(timeout = 60 * 1000)
+ public void automaticSplitWithoutGaps() throws Exception {
+ log.info("Automatic Split Without Gaps");
+
+ automaticSplit(15, 1);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void automaticSplitLater() throws Exception {
+ log.info("Split later");
+ automaticSplit(15, 1);
+
+ final Connector conn = getConnector();
+
+ String tableName = new String();
+ java.util.Iterator<String> iterator = conn.tableOperations().list().iterator();
+
+ while (iterator.hasNext()) {
+ String curr = iterator.next();
+ if (!curr.startsWith(Namespaces.ACCUMULO_NAMESPACE + ".")) {
+ tableName = curr;
+ }
+ }
+
+ // Create a BatchWriter and key for a table entry that is longer than the allowed size for an end row
+ BatchWriter batchWriter = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ byte data[] = new byte[10];
+
+ // Fill key with all j's except for last spot which alternates through 1 through 10 for every j value
+ for (int j = 15; j < 150; j += 1) {
+ for (int i = 0; i < data.length - 1; i++) {
+ data[i] = (byte) j;
+ }
+
+ for (int i = 0; i < 25; i++) {
+ data[data.length - 1] = (byte) i;
+ Mutation m = new Mutation(data);
+ m.put("cf", "cq", "value");
+ batchWriter.addMutation(m);
+ }
+ }
+ // Flush the BatchWriter and table and sleep for a bit to make sure that there is enough time for the table to split if need be.
+ batchWriter.close();
+ conn.tableOperations().flush(tableName, new Text(), new Text("z"), true);
+
+ // Make sure a split occurs
+ while (conn.tableOperations().listSplits(tableName).size() == 0) {
+ Thread.sleep(250);
+ }
+
+ Assert.assertTrue(0 < conn.tableOperations().listSplits(tableName).size());
+ }
+
+ private void automaticSplit(int max, int spacing) throws Exception {
+ // make a table and lower the configure properties
+ final String tableName = getUniqueNames(1)[0];
+ final Connector conn = getConnector();
+ conn.tableOperations().create(tableName);
+ conn.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
+ conn.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none");
+ conn.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64");
+ conn.tableOperations().setProperty(tableName, Property.TABLE_MAX_END_ROW_SIZE.getKey(), "1000");
+
+ // Create a BatchWriter and key for a table entry that is longer than the allowed size for an end row
+ BatchWriter batchWriter = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ byte data[] = new byte[(int) (TableConfiguration.getMemoryInBytes(Property.TABLE_MAX_END_ROW_SIZE.getDefaultValue()) + 2)];
+
+ // Fill key with all j's except for last spot which alternates through 1 through 10 for every j value
+ for (int j = 0; j < max; j += spacing) {
+ for (int i = 0; i < data.length - 1; i++) {
+ data[i] = (byte) j;
+ }
+
+ for (int i = 0; i < 10; i++) {
+ data[data.length - 1] = (byte) i;
+ Mutation m = new Mutation(data);
+ m.put("cf", "cq", "value");
+ batchWriter.addMutation(m);
+ }
+ }
+ // Flush the BatchWriter and table and sleep for a bit to make sure that there is enough time for the table to split if need be.
+ batchWriter.close();
+ conn.tableOperations().flush(tableName, new Text(), new Text("z"), true);
+ Thread.sleep(500);
+
+ // Make sure all the data that was put in the table is still correct
+ int count = 0;
+ int extra = 10;
+ final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
+ for (Entry<Key,Value> entry : scanner) {
+ if (extra == 10) {
+ extra = 0;
+ for (int i = 0; i < data.length - 1; i++) {
+ data[i] = (byte) count;
+ }
+ count += spacing;
+
+ }
+ Key k = entry.getKey();
+ data[data.length - 1] = (byte) extra;
+ String expected = new String(data, UTF_8);
+ Assert.assertEquals(expected, k.getRow().toString());
+ Assert.assertEquals("cf", k.getColumnFamily().toString());
+ Assert.assertEquals("cq", k.getColumnQualifier().toString());
+ Assert.assertEquals("value", entry.getValue().toString());
+ extra++;
+ }
+ Assert.assertEquals(10, extra);
+ Assert.assertEquals(max, count);
+
+ // Make sure no splits occured in the table
+ Assert.assertEquals(0, conn.tableOperations().listSplits(tableName).size());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
new file mode 100644
index 0000000..9babeba
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.master.state.MetaDataStateStore;
+import org.apache.accumulo.server.master.state.RootTabletStateStore;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class MasterRepairsDualAssignmentIT extends ConfigurableMacBase {
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 5 * 60;
+ }
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+ cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "5s");
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Test
+ public void test() throws Exception {
+ // make some tablets, spread 'em around
+ Connector c = getConnector();
+ ClientContext context = new ClientContext(c.getInstance(), new Credentials("root", new PasswordToken(ROOT_PASSWORD)), getClientConfig());
+ String table = this.getUniqueNames(1)[0];
+ c.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+ c.securityOperations().grantTablePermission("root", RootTable.NAME, TablePermission.WRITE);
+ c.tableOperations().create(table);
+ SortedSet<Text> partitions = new TreeSet<Text>();
+ for (String part : "a b c d e f g h i j k l m n o p q r s t u v w x y z".split(" ")) {
+ partitions.add(new Text(part));
+ }
+ c.tableOperations().addSplits(table, partitions);
+ // scan the metadata table and get the two table location states
+ Set<TServerInstance> states = new HashSet<TServerInstance>();
+ Set<TabletLocationState> oldLocations = new HashSet<TabletLocationState>();
+ MetaDataStateStore store = new MetaDataStateStore(context, null);
+ while (states.size() < 2) {
+ UtilWaitThread.sleep(250);
+ oldLocations.clear();
+ for (TabletLocationState tls : store) {
+ if (tls.current != null) {
+ states.add(tls.current);
+ oldLocations.add(tls);
+ }
+ }
+ }
+ assertEquals(2, states.size());
+ // Kill a tablet server... we don't care which one... wait for everything to be reassigned
+ cluster.killProcess(ServerType.TABLET_SERVER, cluster.getProcesses().get(ServerType.TABLET_SERVER).iterator().next());
+ Set<TServerInstance> replStates = new HashSet<>();
+ // Find out which tablet server remains
+ while (true) {
+ UtilWaitThread.sleep(1000);
+ states.clear();
+ replStates.clear();
+ boolean allAssigned = true;
+ for (TabletLocationState tls : store) {
+ if (tls != null && tls.current != null) {
+ states.add(tls.current);
+ } else if (tls != null && tls.extent.equals(new KeyExtent(new Text(ReplicationTable.ID), null, null))) {
+ replStates.add(tls.current);
+ } else {
+ allAssigned = false;
+ }
+ }
+ System.out.println(states + " size " + states.size() + " allAssigned " + allAssigned);
+ if (states.size() != 2 && allAssigned == true)
+ break;
+ }
+ assertEquals(1, replStates.size());
+ assertEquals(1, states.size());
+ // pick an assigned tablet and assign it to the old tablet
+ TabletLocationState moved = null;
+ for (TabletLocationState old : oldLocations) {
+ if (!states.contains(old.current)) {
+ moved = old;
+ }
+ }
+ assertNotEquals(null, moved);
+ // throw a mutation in as if we were the dying tablet
+ BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ Mutation assignment = new Mutation(moved.extent.getMetadataEntry());
+ moved.current.putLocation(assignment);
+ bw.addMutation(assignment);
+ bw.close();
+ // wait for the master to fix the problem
+ waitForCleanStore(store);
+ // now jam up the metadata table
+ bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ assignment = new Mutation(new KeyExtent(new Text(MetadataTable.ID), null, null).getMetadataEntry());
+ moved.current.putLocation(assignment);
+ bw.addMutation(assignment);
+ bw.close();
+ waitForCleanStore(new RootTabletStateStore(context, null));
+ }
+
+ private void waitForCleanStore(MetaDataStateStore store) {
+ while (true) {
+ try {
+ Iterators.size(store.iterator());
+ } catch (Exception ex) {
+ System.out.println(ex);
+ UtilWaitThread.sleep(250);
+ continue;
+ }
+ break;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/MetaConstraintRetryIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/MetaConstraintRetryIT.java b/test/src/main/java/org/apache/accumulo/test/MetaConstraintRetryIT.java
new file mode 100644
index 0000000..727859f
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/MetaConstraintRetryIT.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test;
+
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.Writer;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class MetaConstraintRetryIT extends AccumuloClusterHarness {
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 30;
+ }
+
+ // a test for ACCUMULO-3096
+ @Test(expected = ConstraintViolationException.class)
+ public void test() throws Exception {
+
+ getConnector().securityOperations().grantTablePermission(getAdminPrincipal(), MetadataTable.NAME, TablePermission.WRITE);
+
+ Credentials credentials = new Credentials(getAdminPrincipal(), getAdminToken());
+ ClientContext context = new ClientContext(getConnector().getInstance(), credentials, cluster.getClientConfig());
+ Writer w = new Writer(context, MetadataTable.ID);
+ KeyExtent extent = new KeyExtent(new Text("5"), null, null);
+
+ Mutation m = new Mutation(extent.getMetadataEntry());
+ // unknown columns should cause contraint violation
+ m.put("badcolfam", "badcolqual", "3");
+
+ try {
+ MetadataTableUtil.update(w, null, m);
+ } catch (RuntimeException e) {
+ if (e.getCause().getClass().equals(ConstraintViolationException.class)) {
+ throw (ConstraintViolationException) e.getCause();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/MetaGetsReadersIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/MetaGetsReadersIT.java b/test/src/main/java/org/apache/accumulo/test/MetaGetsReadersIT.java
new file mode 100644
index 0000000..84a5996
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/MetaGetsReadersIT.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class MetaGetsReadersIT extends ConfigurableMacBase {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
+ cfg.setProperty(Property.TSERV_SCAN_MAX_OPENFILES, "2");
+ cfg.setProperty(Property.TABLE_BLOCKCACHE_ENABLED, "false");
+ }
+
+ private static Thread slowScan(final Connector c, final String tableName, final AtomicBoolean stop) {
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ while (stop.get() == false) {
+ Scanner s = c.createScanner(tableName, Authorizations.EMPTY);
+ IteratorSetting is = new IteratorSetting(50, SlowIterator.class);
+ SlowIterator.setSleepTime(is, 10);
+ s.addScanIterator(is);
+ Iterator<Entry<Key,Value>> iterator = s.iterator();
+ while (iterator.hasNext() && stop.get() == false) {
+ iterator.next();
+ }
+ }
+ } catch (Exception ex) {
+ log.trace("{}", ex.getMessage(), ex);
+ stop.set(true);
+ }
+ }
+ };
+ return thread;
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void test() throws Exception {
+ final String tableName = getUniqueNames(1)[0];
+ final Connector c = getConnector();
+ c.tableOperations().create(tableName);
+ Random random = new Random();
+ BatchWriter bw = c.createBatchWriter(tableName, null);
+ for (int i = 0; i < 50000; i++) {
+ byte[] row = new byte[100];
+ random.nextBytes(row);
+ Mutation m = new Mutation(row);
+ m.put("", "", "");
+ bw.addMutation(m);
+ }
+ bw.close();
+ c.tableOperations().flush(tableName, null, null, true);
+ final AtomicBoolean stop = new AtomicBoolean(false);
+ Thread t1 = slowScan(c, tableName, stop);
+ t1.start();
+ Thread t2 = slowScan(c, tableName, stop);
+ t2.start();
+ UtilWaitThread.sleep(500);
+ long now = System.currentTimeMillis();
+ Scanner m = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ Iterators.size(m.iterator());
+ long delay = System.currentTimeMillis() - now;
+ System.out.println("Delay = " + delay);
+ assertTrue("metadata table scan was slow", delay < 1000);
+ assertFalse(stop.get());
+ stop.set(true);
+ t1.interrupt();
+ t2.interrupt();
+ t1.join();
+ t2.join();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java b/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java
new file mode 100644
index 0000000..0bc78fb
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetaSplitIT extends AccumuloClusterHarness {
+ private static final Logger log = LoggerFactory.getLogger(MetaSplitIT.class);
+
+ private Collection<Text> metadataSplits = null;
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 3 * 60;
+ }
+
+ @Before
+ public void saveMetadataSplits() throws Exception {
+ if (ClusterType.STANDALONE == getClusterType()) {
+ Connector conn = getConnector();
+ Collection<Text> splits = conn.tableOperations().listSplits(MetadataTable.NAME);
+ // We expect a single split
+ if (!splits.equals(Arrays.asList(new Text("~")))) {
+ log.info("Existing splits on metadata table. Saving them, and applying single original split of '~'");
+ metadataSplits = splits;
+ conn.tableOperations().merge(MetadataTable.NAME, null, null);
+ conn.tableOperations().addSplits(MetadataTable.NAME, new TreeSet<Text>(Collections.singleton(new Text("~"))));
+ }
+ }
+ }
+
+ @After
+ public void restoreMetadataSplits() throws Exception {
+ if (null != metadataSplits) {
+ log.info("Restoring split on metadata table");
+ Connector conn = getConnector();
+ conn.tableOperations().merge(MetadataTable.NAME, null, null);
+ conn.tableOperations().addSplits(MetadataTable.NAME, new TreeSet<Text>(metadataSplits));
+ }
+ }
+
+ @Test(expected = AccumuloException.class)
+ public void testRootTableSplit() throws Exception {
+ TableOperations opts = getConnector().tableOperations();
+ SortedSet<Text> splits = new TreeSet<Text>();
+ splits.add(new Text("5"));
+ opts.addSplits(RootTable.NAME, splits);
+ }
+
+ @Test
+ public void testRootTableMerge() throws Exception {
+ TableOperations opts = getConnector().tableOperations();
+ opts.merge(RootTable.NAME, null, null);
+ }
+
+ private void addSplits(TableOperations opts, String... points) throws Exception {
+ SortedSet<Text> splits = new TreeSet<Text>();
+ for (String point : points) {
+ splits.add(new Text(point));
+ }
+ opts.addSplits(MetadataTable.NAME, splits);
+ }
+
+ @Test
+ public void testMetadataTableSplit() throws Exception {
+ TableOperations opts = getConnector().tableOperations();
+ for (int i = 1; i <= 10; i++) {
+ opts.create(Integer.toString(i));
+ }
+ try {
+ opts.merge(MetadataTable.NAME, new Text("01"), new Text("02"));
+ checkMetadataSplits(1, opts);
+ addSplits(opts, "4 5 6 7 8".split(" "));
+ checkMetadataSplits(6, opts);
+ opts.merge(MetadataTable.NAME, new Text("6"), new Text("9"));
+ checkMetadataSplits(4, opts);
+ addSplits(opts, "44 55 66 77 88".split(" "));
+ checkMetadataSplits(9, opts);
+ opts.merge(MetadataTable.NAME, new Text("5"), new Text("7"));
+ checkMetadataSplits(6, opts);
+ opts.merge(MetadataTable.NAME, null, null);
+ checkMetadataSplits(0, opts);
+ } finally {
+ for (int i = 1; i <= 10; i++) {
+ opts.delete(Integer.toString(i));
+ }
+ }
+ }
+
+ private static void checkMetadataSplits(int numSplits, TableOperations opts) throws AccumuloSecurityException, TableNotFoundException, AccumuloException,
+ InterruptedException {
+ for (int i = 0; i < 10; i++) {
+ if (opts.listSplits(MetadataTable.NAME).size() == numSplits) {
+ break;
+ }
+ Thread.sleep(2000);
+ }
+ Collection<Text> splits = opts.listSplits(MetadataTable.NAME);
+ assertEquals("Actual metadata table splits: " + splits, numSplits, splits.size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
new file mode 100644
index 0000000..b3bf196
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.File;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+
+/**
+ *
+ */
+public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacBase {
+ private static final Logger log = LoggerFactory.getLogger(MissingWalHeaderCompletesRecoveryIT.class);
+
+ private boolean rootHasWritePermission;
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration conf) {
+ cfg.setNumTservers(1);
+ cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "1s");
+ // Make sure the GC doesn't delete the file before the metadata reference is added
+ cfg.setProperty(Property.GC_CYCLE_START, "999999s");
+ conf.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Before
+ public void setupMetadataPermission() throws Exception {
+ Connector conn = getConnector();
+ rootHasWritePermission = conn.securityOperations().hasTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+ if (!rootHasWritePermission) {
+ conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+ // Make sure it propagates through ZK
+ Thread.sleep(5000);
+ }
+ }
+
+ @After
+ public void resetMetadataPermission() throws Exception {
+ Connector conn = getConnector();
+ // Final state doesn't match the original
+ if (rootHasWritePermission != conn.securityOperations().hasTablePermission("root", MetadataTable.NAME, TablePermission.WRITE)) {
+ if (rootHasWritePermission) {
+ // root had write permission when starting, ensure root still does
+ conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+ } else {
+ // root did not have write permission when starting, ensure that it does not
+ conn.securityOperations().revokeTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+ }
+ }
+ }
+
+ @Test
+ public void testEmptyWalRecoveryCompletes() throws Exception {
+ Connector conn = getConnector();
+ MiniAccumuloClusterImpl cluster = getCluster();
+ FileSystem fs = cluster.getFileSystem();
+
+ // Fake out something that looks like host:port, it's irrelevant
+ String fakeServer = "127.0.0.1:12345";
+
+ File walogs = new File(cluster.getConfig().getAccumuloDir(), ServerConstants.WAL_DIR);
+ File walogServerDir = new File(walogs, fakeServer.replace(':', '+'));
+ File emptyWalog = new File(walogServerDir, UUID.randomUUID().toString());
+
+ log.info("Created empty WAL at " + emptyWalog.toURI());
+
+ fs.create(new Path(emptyWalog.toURI())).close();
+
+ Assert.assertTrue("root user did not have write permission to metadata table",
+ conn.securityOperations().hasTablePermission("root", MetadataTable.NAME, TablePermission.WRITE));
+
+ String tableName = getUniqueNames(1)[0];
+ conn.tableOperations().create(tableName);
+
+ String tableId = conn.tableOperations().tableIdMap().get(tableName);
+ Assert.assertNotNull("Table ID was null", tableId);
+
+ LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId), null, null), 0, "127.0.0.1:12345", emptyWalog.toURI().toString());
+
+ log.info("Taking {} offline", tableName);
+ conn.tableOperations().offline(tableName, true);
+
+ log.info("{} is offline", tableName);
+
+ Text row = MetadataSchema.TabletsSection.getRow(new Text(tableId), null);
+ Mutation m = new Mutation(row);
+ m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue());
+
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ bw.addMutation(m);
+ bw.close();
+
+ log.info("Bringing {} online", tableName);
+ conn.tableOperations().online(tableName, true);
+
+ log.info("{} is online", tableName);
+
+ // Reading the table implies that recovery completed successfully (the empty file was ignored)
+ // otherwise the tablet will never come online and we won't be able to read it.
+ Scanner s = conn.createScanner(tableName, Authorizations.EMPTY);
+ Assert.assertEquals(0, Iterables.size(s));
+ }
+
+ @Test
+ public void testPartialHeaderWalRecoveryCompletes() throws Exception {
+ Connector conn = getConnector();
+ MiniAccumuloClusterImpl cluster = getCluster();
+ FileSystem fs = getCluster().getFileSystem();
+
+ // Fake out something that looks like host:port, it's irrelevant
+ String fakeServer = "127.0.0.1:12345";
+
+ File walogs = new File(cluster.getConfig().getAccumuloDir(), ServerConstants.WAL_DIR);
+ File walogServerDir = new File(walogs, fakeServer.replace(':', '+'));
+ File partialHeaderWalog = new File(walogServerDir, UUID.randomUUID().toString());
+
+ log.info("Created WAL with malformed header at " + partialHeaderWalog.toURI());
+
+ // Write half of the header
+ FSDataOutputStream wal = fs.create(new Path(partialHeaderWalog.toURI()));
+ wal.write(DfsLogger.LOG_FILE_HEADER_V3.getBytes(UTF_8), 0, DfsLogger.LOG_FILE_HEADER_V3.length() / 2);
+ wal.close();
+
+ Assert.assertTrue("root user did not have write permission to metadata table",
+ conn.securityOperations().hasTablePermission("root", MetadataTable.NAME, TablePermission.WRITE));
+
+ String tableName = getUniqueNames(1)[0];
+ conn.tableOperations().create(tableName);
+
+ String tableId = conn.tableOperations().tableIdMap().get(tableName);
+ Assert.assertNotNull("Table ID was null", tableId);
+
+ LogEntry logEntry = new LogEntry(null, 0, "127.0.0.1:12345", partialHeaderWalog.toURI().toString());
+
+ log.info("Taking {} offline", tableName);
+ conn.tableOperations().offline(tableName, true);
+
+ log.info("{} is offline", tableName);
+
+ Text row = MetadataSchema.TabletsSection.getRow(new Text(tableId), null);
+ Mutation m = new Mutation(row);
+ m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue());
+
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ bw.addMutation(m);
+ bw.close();
+
+ log.info("Bringing {} online", tableName);
+ conn.tableOperations().online(tableName, true);
+
+ log.info("{} is online", tableName);
+
+ // Reading the table implies that recovery completed successfully (the empty file was ignored)
+ // otherwise the tablet will never come online and we won't be able to read it.
+ Scanner s = conn.createScanner(tableName, Authorizations.EMPTY);
+ Assert.assertEquals(0, Iterables.size(s));
+ }
+
+}