You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2014/04/22 23:08:48 UTC
[13/15] git commit: ACCUMULO-2654 adds a IT to the functional suite
to cover recovery via empty rfile.
ACCUMULO-2654 adds a IT to the functional suite to cover recovery via empty rfile.
Had to update to use bcfile.Compression directly, since TFile ceased to be.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/35b0549b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/35b0549b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/35b0549b
Branch: refs/heads/master
Commit: 35b0549ba562334e46944d2a1fbe406bf96349a4
Parents: 1ed463e
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed Apr 16 17:01:29 2014 -0500
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Tue Apr 22 15:30:23 2014 -0500
----------------------------------------------------------------------
.../accumulo/core/file/rfile/CreateEmpty.java | 11 +-
.../core/file/rfile/bcfile/Compression.java | 2 +-
.../accumulo/test/functional/ReadWriteIT.java | 2 +-
.../functional/RecoveryWithEmptyRFileIT.java | 144 +++++++++++++++++++
4 files changed, 153 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/35b0549b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
index 09a2d61..058bb84 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
@@ -24,11 +24,12 @@ import org.apache.accumulo.core.cli.Help;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.file.rfile.RFile.Writer;
-import org.apache.accumulo.core.file.rfile.bcfile.TFile;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
import com.beust.jcommander.IParameterValidator;
import com.beust.jcommander.Parameter;
@@ -38,6 +39,7 @@ import com.beust.jcommander.ParameterException;
* Create an empty RFile for use in recovering from data loss where Accumulo still refers internally to a path.
*/
public class CreateEmpty {
+ private static final Logger log = Logger.getLogger(CreateEmpty.class);
public static class NamedLikeRFile implements IParameterValidator {
@Override
@@ -51,16 +53,16 @@ public class CreateEmpty {
public static class IsSupportedCompressionAlgorithm implements IParameterValidator {
@Override
public void validate(String name, String value) throws ParameterException {
- String[] algorithms = TFile.getSupportedCompressionAlgorithms();
+ String[] algorithms = Compression.getSupportedAlgorithms();
if (!((Arrays.asList(algorithms)).contains(value))) {
- throw new ParameterException("Compression codec must be one of " + Arrays.toString(TFile.getSupportedCompressionAlgorithms()));
+ throw new ParameterException("Compression codec must be one of " + Arrays.toString(algorithms));
}
}
}
static class Opts extends Help {
@Parameter(names = {"-c", "--codec"}, description = "the compression codec to use.", validateWith = IsSupportedCompressionAlgorithm.class)
- String codec = TFile.COMPRESSION_NONE;
+ String codec = Compression.COMPRESSION_NONE;
@Parameter(description = " <path> { <path> ... } Each path given is a URL. Relative paths are resolved according to the default filesystem defined in your Hadoop configuration, which is usually an HDFS instance.", required = true, validateWith = NamedLikeRFile.class)
List<String> files = new ArrayList<String>();
}
@@ -73,6 +75,7 @@ public class CreateEmpty {
for (String arg : opts.files) {
Path path = new Path(arg);
+ log.info("Writing to file '" + path + "'");
FileSKVWriter writer = (new RFileOperations()).openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), opts.codec);
writer.close();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/35b0549b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
index 66ca07f..5288bbb 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
@@ -378,7 +378,7 @@ public final class Compression {
throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
}
- static String[] getSupportedAlgorithms() {
+ public static String[] getSupportedAlgorithms() {
Algorithm[] algos = Algorithm.class.getEnumConstants();
ArrayList<String> ret = new ArrayList<String>();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/35b0549b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
index e4fe57c..6dbecfa 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
@@ -100,7 +100,7 @@ public class ReadWriteIT extends ConfigurableMacIT {
TestIngest.ingest(connector, opts, new BatchWriterOpts());
}
- private static void verify(Connector connector, int rows, int cols, int width, int offset) throws Exception {
+ public static void verify(Connector connector, int rows, int cols, int width, int offset) throws Exception {
verify(connector, rows, cols, width, offset, COLF);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/35b0549b/test/src/test/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java b/test/src/test/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
new file mode 100644
index 0000000..abe6950
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.rfile.CreateEmpty;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.monitor.Monitor;
+import org.apache.accumulo.server.util.Admin;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.TestMultiTableIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+/**
+ XXX As a part of verifying lossy recovery via inserting an empty rfile,
+ this test deletes test table tablets. This will require write access to
+ the backing files of the test Accumulo mini cluster.
+
+ This test should read the file location from the test harness and that
+ file should be on the local filesystem. If you want to take a paranoid
+ approach just make sure the test user doesn't have write access to the
+ HDFS files of any colocated live Accumulo instance or any important
+ local filesystem files..
+*/
+public class RecoveryWithEmptyRFileIT extends ConfigurableMacIT {
+ private static final Logger log = Logger.getLogger(RecoveryWithEmptyRFileIT.class);
+
+ static final int ROWS = 200000;
+ static final int COLS = 1;
+ static final String COLF = "colf";
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.useMiniDFS(true);
+ }
+
+ @Test
+ public void replaceMissingRFile() throws Exception {
+ log.info("Ingest some data, verify it was stored properly, replace an underlying rfile with an empty one and verify we can scan.");
+ Connector connector = getConnector();
+ ReadWriteIT.ingest(connector, ROWS, COLS, 50, 0);
+ ReadWriteIT.verify(connector, ROWS, COLS, 50, 0);
+
+ connector.tableOperations().flush("test_ingest", null, null, true);
+ connector.tableOperations().offline("test_ingest", true);
+
+ log.debug("Replacing rfile(s) with empty");
+ Scanner meta = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ String tableId = connector.tableOperations().tableIdMap().get("test_ingest");
+ meta.setRange(new Range(new Text(tableId + ";"), new Text(tableId + "<")));
+ meta.fetchColumnFamily(DataFileColumnFamily.NAME);
+ boolean foundFile = false;
+ for (Entry<Key,Value> entry : meta) {
+ foundFile = true;
+ Path rfile = new Path(entry.getKey().getColumnQualifier().toString());
+ log.debug("Removing rfile '" + rfile + "'");
+ cluster.getFileSystem().delete(rfile, false);
+ Process info = cluster.exec(CreateEmpty.class, rfile.toString());
+ assertEquals(0, info.waitFor());
+ }
+ meta.close();
+ assertTrue(foundFile);
+
+ if(log.isTraceEnabled()) {
+ log.trace("Enumerating backing filesystem paths:");
+ RemoteIterator<LocatedFileStatus> paths = cluster.getFileSystem().listFiles(new Path("/"), true);
+ while (paths.hasNext()) {
+ FileStatus path = paths.next();
+ log.trace(path.toString());
+ }
+ }
+ log.trace("invalidate cached file handles by issuing a compaction");
+ connector.tableOperations().online("test_ingest", true);
+ connector.tableOperations().compact("test_ingest", null, null, false, true);
+
+ log.debug("make sure we can still scan");
+ Scanner scan = connector.createScanner("test_ingest", Authorizations.EMPTY);
+ scan.setRange(new Range());
+ long cells = 0l;
+ for (Entry<Key,Value> entry : scan) {
+ cells++;
+ }
+ scan.close();
+ assertEquals(0l, cells);
+ FileSystem.closeAll();
+ }
+
+}