You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2014/04/22 23:08:48 UTC

[13/15] git commit: ACCUMULO-2654 adds a IT to the functional suite to cover recovery via empty rfile.

ACCUMULO-2654 adds a IT to the functional suite to cover recovery via empty rfile.

Had to update to use bcfile.Compression directly, since TFile ceased to be.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/35b0549b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/35b0549b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/35b0549b

Branch: refs/heads/master
Commit: 35b0549ba562334e46944d2a1fbe406bf96349a4
Parents: 1ed463e
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed Apr 16 17:01:29 2014 -0500
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Tue Apr 22 15:30:23 2014 -0500

----------------------------------------------------------------------
 .../accumulo/core/file/rfile/CreateEmpty.java   |  11 +-
 .../core/file/rfile/bcfile/Compression.java     |   2 +-
 .../accumulo/test/functional/ReadWriteIT.java   |   2 +-
 .../functional/RecoveryWithEmptyRFileIT.java    | 144 +++++++++++++++++++
 4 files changed, 153 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/35b0549b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
index 09a2d61..058bb84 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
@@ -24,11 +24,12 @@ import org.apache.accumulo.core.cli.Help;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.file.rfile.RFile.Writer;
-import org.apache.accumulo.core.file.rfile.bcfile.TFile;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
 
 import com.beust.jcommander.IParameterValidator;
 import com.beust.jcommander.Parameter;
@@ -38,6 +39,7 @@ import com.beust.jcommander.ParameterException;
  * Create an empty RFile for use in recovering from data loss where Accumulo still refers internally to a path.
  */
 public class CreateEmpty {
+  private static final Logger log = Logger.getLogger(CreateEmpty.class);
 
   public static class NamedLikeRFile implements IParameterValidator {
     @Override
@@ -51,16 +53,16 @@ public class CreateEmpty {
   public static class IsSupportedCompressionAlgorithm implements IParameterValidator {
     @Override
     public void validate(String name, String value) throws ParameterException {
-      String[] algorithms = TFile.getSupportedCompressionAlgorithms();
+      String[] algorithms = Compression.getSupportedAlgorithms();
       if (!((Arrays.asList(algorithms)).contains(value))) {
-        throw new ParameterException("Compression codec must be one of " + Arrays.toString(TFile.getSupportedCompressionAlgorithms()));
+        throw new ParameterException("Compression codec must be one of " + Arrays.toString(algorithms));
       }
     }
   }
 
   static class Opts extends Help {
     @Parameter(names = {"-c", "--codec"}, description = "the compression codec to use.", validateWith = IsSupportedCompressionAlgorithm.class)
-    String codec = TFile.COMPRESSION_NONE;
+    String codec = Compression.COMPRESSION_NONE;
     @Parameter(description = " <path> { <path> ... } Each path given is a URL. Relative paths are resolved according to the default filesystem defined in your Hadoop configuration, which is usually an HDFS instance.", required = true, validateWith = NamedLikeRFile.class)
     List<String> files = new ArrayList<String>();
   }
@@ -73,6 +75,7 @@ public class CreateEmpty {
 
     for (String arg : opts.files) {
       Path path = new Path(arg);
+      log.info("Writing to file '" + path + "'");
       FileSKVWriter writer = (new RFileOperations()).openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), opts.codec);
       writer.close();
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/35b0549b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
index 66ca07f..5288bbb 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
@@ -378,7 +378,7 @@ public final class Compression {
     throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
   }
   
-  static String[] getSupportedAlgorithms() {
+  public static String[] getSupportedAlgorithms() {
     Algorithm[] algos = Algorithm.class.getEnumConstants();
     
     ArrayList<String> ret = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/35b0549b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
index e4fe57c..6dbecfa 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java
@@ -100,7 +100,7 @@ public class ReadWriteIT extends ConfigurableMacIT {
     TestIngest.ingest(connector, opts, new BatchWriterOpts());
   }
   
-  private static void verify(Connector connector, int rows, int cols, int width, int offset) throws Exception {
+  public static void verify(Connector connector, int rows, int cols, int width, int offset) throws Exception {
     verify(connector, rows, cols, width, offset, COLF);
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/35b0549b/test/src/test/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java b/test/src/test/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
new file mode 100644
index 0000000..abe6950
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.rfile.CreateEmpty;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.monitor.Monitor;
+import org.apache.accumulo.server.util.Admin;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.TestMultiTableIngest;
+import org.apache.accumulo.test.VerifyIngest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+/**
+  XXX As a part of verifying lossy recovery via inserting an empty rfile,
+  this test deletes test table tablets. This will require write access to
+  the backing files of the test Accumulo mini cluster.
+
+  This test should read the file location from the test harness and that
+  file should be on the local filesystem. If you want to take a paranoid
+  approach just make sure the test user doesn't have write access to the
+  HDFS files of any colocated live Accumulo instance or any important
+  local filesystem files..
+*/
+public class RecoveryWithEmptyRFileIT extends ConfigurableMacIT {
+  private static final Logger log = Logger.getLogger(RecoveryWithEmptyRFileIT.class);
+
+  static final int ROWS = 200000;
+  static final int COLS = 1;
+  static final String COLF = "colf";
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 2 * 60;
+  }
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.useMiniDFS(true);
+  }
+
+  @Test
+  public void replaceMissingRFile() throws Exception {
+    log.info("Ingest some data, verify it was stored properly, replace an underlying rfile with an empty one and verify we can scan.");
+    Connector connector = getConnector();
+    ReadWriteIT.ingest(connector, ROWS, COLS, 50, 0);
+    ReadWriteIT.verify(connector, ROWS, COLS, 50, 0);
+
+    connector.tableOperations().flush("test_ingest", null, null, true);
+    connector.tableOperations().offline("test_ingest", true);
+
+    log.debug("Replacing rfile(s) with empty");
+    Scanner meta = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    String tableId = connector.tableOperations().tableIdMap().get("test_ingest");
+    meta.setRange(new Range(new Text(tableId + ";"), new Text(tableId + "<")));
+    meta.fetchColumnFamily(DataFileColumnFamily.NAME);
+    boolean foundFile = false;
+    for (Entry<Key,Value> entry : meta) {
+      foundFile = true;
+      Path rfile = new Path(entry.getKey().getColumnQualifier().toString());
+      log.debug("Removing rfile '" + rfile + "'");
+      cluster.getFileSystem().delete(rfile, false);
+      Process info = cluster.exec(CreateEmpty.class, rfile.toString());
+      assertEquals(0, info.waitFor());
+    }
+    meta.close();
+    assertTrue(foundFile);
+
+    if(log.isTraceEnabled()) {
+      log.trace("Enumerating backing filesystem paths:");
+      RemoteIterator<LocatedFileStatus> paths = cluster.getFileSystem().listFiles(new Path("/"), true);
+      while (paths.hasNext()) {
+        FileStatus path = paths.next();
+        log.trace(path.toString());
+      }
+    }
+    log.trace("invalidate cached file handles by issuing a compaction");
+    connector.tableOperations().online("test_ingest", true);
+    connector.tableOperations().compact("test_ingest", null, null, false, true);
+
+    log.debug("make sure we can still scan");
+    Scanner scan = connector.createScanner("test_ingest", Authorizations.EMPTY);
+    scan.setRange(new Range());
+    long cells = 0l;
+    for (Entry<Key,Value> entry : scan) {
+      cells++;
+    }
+    scan.close();
+    assertEquals(0l, cells);
+    FileSystem.closeAll();
+  }
+
+}