You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2014/04/22 23:08:38 UTC

[03/15] git commit: ACCUMULO-2654 Adds utility for creating empty rfile.

ACCUMULO-2654 Adds utility for creating empty rfile.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5b32fd22
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5b32fd22
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5b32fd22

Branch: refs/heads/1.5.2-SNAPSHOT
Commit: 5b32fd22b928caa44ea1535851b57cede5f34c5f
Parents: 3648c1b
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed Apr 9 11:34:56 2014 -0500
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Tue Apr 22 12:25:44 2014 -0500

----------------------------------------------------------------------
 .../accumulo/core/file/rfile/CreateEmpty.java   |  72 +++++++++++++
 .../core/file/rfile/RFileOperations.java        |   6 +-
 test/system/auto/simple/recoverWithEmpty.py     | 104 +++++++++++++++++++
 3 files changed, 180 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5b32fd22/src/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java b/src/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
new file mode 100644
index 0000000..7663b2d
--- /dev/null
+++ b/src/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile.Writer;
+import org.apache.accumulo.core.file.rfile.bcfile.TFile;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Create an empty RFile for use in recovering from data loss where Accumulo still refers internally to a path.
+ */
+public class CreateEmpty {
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = CachedConfiguration.getInstance();
+
+    Options opts = new Options();
+    Option codecOption = new Option("c", "codec", true, "the compression codec to use. one of " + Arrays.toString(TFile.getSupportedCompressionAlgorithms()) + ". defaults to none.");
+    opts.addOption(codecOption);
+    Option help = new Option( "?", "help", false, "print this message" );
+    opts.addOption(help);
+
+    CommandLine commandLine = new BasicParser().parse(opts, args);
+    if (commandLine.hasOption(help.getOpt()) || 0 == commandLine.getArgs().length) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp(120, "$ACCUMULO_HOME/bin/accumulo " + CreateEmpty.class.getName() + "[options] path [path ...]",
+          "", opts,
+          "each path given is a filesystem URL. Relative paths are resolved according to the default filesytem defined in your Hadoop configuration, which is usually an HDFS instance.");
+    }
+    String codec = commandLine.getOptionValue(codecOption.getOpt(), TFile.COMPRESSION_NONE);
+
+    for (String arg : commandLine.getArgs()) {
+      if (!arg.endsWith(".rf")) {
+        throw new IllegalArgumentException("File must end with .rf and '" + arg + "' does not.");
+      }
+    }
+
+    for (String arg : commandLine.getArgs()) {
+      Path path = new Path(arg);
+      FileSKVWriter writer = (new RFileOperations()).openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), codec);
+      writer.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5b32fd22/src/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/src/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 5374332..c906522 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -103,6 +103,10 @@ public class RFileOperations extends FileOperations {
   
   @Override
   public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+    return openWriter(file, fs, conf, acuconf, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+  }
+
+  FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, String compression) throws IOException {
     int hrep = conf.getInt("dfs.replication", -1);
     int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
     int rep = hrep;
@@ -119,8 +123,6 @@ public class RFileOperations extends FileOperations {
     long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
     long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
     
-    String compression = acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE);
-    
     CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf);
     Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize);
     return writer;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5b32fd22/test/system/auto/simple/recoverWithEmpty.py
----------------------------------------------------------------------
diff --git a/test/system/auto/simple/recoverWithEmpty.py b/test/system/auto/simple/recoverWithEmpty.py
new file mode 100755
index 0000000..18ac055
--- /dev/null
+++ b/test/system/auto/simple/recoverWithEmpty.py
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+import logging
+import unittest
+import time
+import sys
+
+from TestUtils import TestUtilsMixin, ROOT, ROOT_PASSWORD, SITE_PATH
+
+log = logging.getLogger('test.auto')
+
+# XXX As a part of verifying lossy recovery via inserting an empty rfile,
+# this test deletes test table tablets within HDFS.
+# It will require write access to the backing files of the test Accumulo
+# instance in HDFS.
+#
+# This test should read instance.dfs.dir properly from the test harness, but
+# if you want to take a paranoid approach just make sure the test user
+# doesn't have write access to the HDFS files of any colocated live
+# Accumulo instance.
+class RecoverWithEmptyTest(unittest.TestCase, TestUtilsMixin):
+    "Ingest some data, verify it was stored properly, replace an underlying rfile with an empty one and verify we can scan."
+    order = 95
+
+    def add_options(self, parser):
+        if not parser.has_option('-c'):
+            parser.add_option('-c', '--rows', dest='rows',
+                              default=20000, type=int,
+                              help="The number of rows to write "
+                              "when testing (%default)")
+        if not parser.has_option('-n'):
+            parser.add_option('-n', '--size', dest='size',
+                              default=50, type=int,
+                              help="The size of values to write "
+                              "when testing (%default)")
+    def setUp(self):
+        TestUtilsMixin.setUp(self);
+        # initialize the database
+        self.createTable('test_ingest')
+        # start test ingestion
+        log.info("Starting Test Ingester")
+        self.ingester = self.ingest(self.masterHost(),
+                                    self.options.rows,
+                                    size=self.options.size)
+
+    def tearDown(self):
+        TestUtilsMixin.tearDown(self)
+        self.pkill(self.masterHost(), 'TestIngest')
+
+    def waitTime(self):
+        return 1000*120 * self.options.rows / 1e6 + 30
+
+    def runTest(self):
+        waitTime = self.waitTime()
+
+        self.waitForStop(self.ingester, waitTime)
+
+        log.info("Verifying Ingestion")
+        self.waitForStop(self.verify(self.masterHost(),
+                                     self.options.rows,
+                                     size=self.options.size),
+                         waitTime)
+        log.info("Replacing rfile with empty")
+        out,err,code = self.shell(self.masterHost(), 'flush -t test_ingest\n')
+        self.processResult(out,err,code)
+        out,err = self.waitForStop(self.runOn(self.masterHost(), [self.accumulo_sh(), 'shell', '-u', ROOT, '-p', ROOT_PASSWORD, '-e', 'scan -t !METADATA']), waitTime)
+        self.failUnless(out.find("%s< file:/default_tablet/F0000000.rf" % self.getTableId('test_ingest')) >= 0,
+                                 "Test assumptions about the rfiles backing our test table are wrong. please file a bug.")
+        out,err,code = self.shell(self.masterHost(), 'offline -t test_ingest\n')
+        self.processResult(out,err,code)
+        import config
+        rfile = "%s/tables/%s/default_tablet/F0000000.rf" % (config.parse(SITE_PATH)['instance.dfs.dir'], self.getTableId('test_ingest'))
+        log.info("Removing rfile '%s'" % rfile)
+        self.waitForStop(self.runOn(self.masterHost(), ['hadoop', 'fs', '-rm', rfile]), waitTime)
+        self.waitForStop(self.runClassOn(self.masterHost(),
+                                         "org.apache.accumulo.core.file.rfile.CreateEmpty",
+                                         [rfile]),
+                         waitTime)
+        log.info("Make sure we can still scan")
+        out,err,code = self.shell(self.masterHost(), 'online -t test_ingest\n')
+        self.processResult(out,err,code);
+        out,err = self.waitForStop(self.runOn(self.masterHost(), [self.accumulo_sh(), 'shell', '-u', ROOT, '-p', ROOT_PASSWORD, '-e', 'scan -t test_ingest']), waitTime)
+        self.failUnless(len(out) == 0)
+        self.shutdown_accumulo()
+
+def suite():
+    result = unittest.TestSuite()
+    result.addTest(RecoverWithEmptyTest())
+    return result