You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2014/04/22 23:08:38 UTC
[03/15] git commit: ACCUMULO-2654 Adds utility for creating empty
rfile.
ACCUMULO-2654 Adds utility for creating empty rfile.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5b32fd22
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5b32fd22
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5b32fd22
Branch: refs/heads/1.5.2-SNAPSHOT
Commit: 5b32fd22b928caa44ea1535851b57cede5f34c5f
Parents: 3648c1b
Author: Sean Busbey <bu...@cloudera.com>
Authored: Wed Apr 9 11:34:56 2014 -0500
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Tue Apr 22 12:25:44 2014 -0500
----------------------------------------------------------------------
.../accumulo/core/file/rfile/CreateEmpty.java | 72 +++++++++++++
.../core/file/rfile/RFileOperations.java | 6 +-
test/system/auto/simple/recoverWithEmpty.py | 104 +++++++++++++++++++
3 files changed, 180 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5b32fd22/src/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java b/src/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
new file mode 100644
index 0000000..7663b2d
--- /dev/null
+++ b/src/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile.Writer;
+import org.apache.accumulo.core.file.rfile.bcfile.TFile;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Create an empty RFile for use in recovering from data loss where Accumulo still refers internally to a path.
+ */
+public class CreateEmpty {
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = CachedConfiguration.getInstance();
+
+ Options opts = new Options();
+ Option codecOption = new Option("c", "codec", true, "the compression codec to use. one of " + Arrays.toString(TFile.getSupportedCompressionAlgorithms()) + ". defaults to none.");
+ opts.addOption(codecOption);
+ Option help = new Option( "?", "help", false, "print this message" );
+ opts.addOption(help);
+
+ CommandLine commandLine = new BasicParser().parse(opts, args);
+ if (commandLine.hasOption(help.getOpt()) || 0 == commandLine.getArgs().length) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(120, "$ACCUMULO_HOME/bin/accumulo " + CreateEmpty.class.getName() + "[options] path [path ...]",
+ "", opts,
+ "each path given is a filesystem URL. Relative paths are resolved according to the default filesytem defined in your Hadoop configuration, which is usually an HDFS instance.");
+ }
+ String codec = commandLine.getOptionValue(codecOption.getOpt(), TFile.COMPRESSION_NONE);
+
+ for (String arg : commandLine.getArgs()) {
+ if (!arg.endsWith(".rf")) {
+ throw new IllegalArgumentException("File must end with .rf and '" + arg + "' does not.");
+ }
+ }
+
+ for (String arg : commandLine.getArgs()) {
+ Path path = new Path(arg);
+ FileSKVWriter writer = (new RFileOperations()).openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), codec);
+ writer.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5b32fd22/src/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --git a/src/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/src/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 5374332..c906522 100644
--- a/src/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/src/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -103,6 +103,10 @@ public class RFileOperations extends FileOperations {
@Override
public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+ return openWriter(file, fs, conf, acuconf, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+ }
+
+ FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, String compression) throws IOException {
int hrep = conf.getInt("dfs.replication", -1);
int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
int rep = hrep;
@@ -119,8 +123,6 @@ public class RFileOperations extends FileOperations {
long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
- String compression = acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE);
-
CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf);
Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize);
return writer;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5b32fd22/test/system/auto/simple/recoverWithEmpty.py
----------------------------------------------------------------------
diff --git a/test/system/auto/simple/recoverWithEmpty.py b/test/system/auto/simple/recoverWithEmpty.py
new file mode 100755
index 0000000..18ac055
--- /dev/null
+++ b/test/system/auto/simple/recoverWithEmpty.py
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+import logging
+import unittest
+import time
+import sys
+
+from TestUtils import TestUtilsMixin, ROOT, ROOT_PASSWORD, SITE_PATH
+
+log = logging.getLogger('test.auto')
+
+# XXX As a part of verifying lossy recovery via inserting an empty rfile,
+# this test deletes test table tablets within HDFS.
+# It will require write access to the backing files of the test Accumulo
+# instance in HDFS.
+#
+# This test should read instance.dfs.dir properly from the test harness, but
+# if you want to take a paranoid approach just make sure the test user
+# doesn't have write access to the HDFS files of any colocated live
+# Accumulo instance.
+class RecoverWithEmptyTest(unittest.TestCase, TestUtilsMixin):
+ "Ingest some data, verify it was stored properly, replace an underlying rfile with an empty one and verify we can scan."
+ order = 95
+
+ def add_options(self, parser):
+ if not parser.has_option('-c'):
+ parser.add_option('-c', '--rows', dest='rows',
+ default=20000, type=int,
+ help="The number of rows to write "
+ "when testing (%default)")
+ if not parser.has_option('-n'):
+ parser.add_option('-n', '--size', dest='size',
+ default=50, type=int,
+ help="The size of values to write "
+ "when testing (%default)")
+ def setUp(self):
+ TestUtilsMixin.setUp(self);
+ # initialize the database
+ self.createTable('test_ingest')
+ # start test ingestion
+ log.info("Starting Test Ingester")
+ self.ingester = self.ingest(self.masterHost(),
+ self.options.rows,
+ size=self.options.size)
+
+ def tearDown(self):
+ TestUtilsMixin.tearDown(self)
+ self.pkill(self.masterHost(), 'TestIngest')
+
+ def waitTime(self):
+ return 1000*120 * self.options.rows / 1e6 + 30
+
+ def runTest(self):
+ waitTime = self.waitTime()
+
+ self.waitForStop(self.ingester, waitTime)
+
+ log.info("Verifying Ingestion")
+ self.waitForStop(self.verify(self.masterHost(),
+ self.options.rows,
+ size=self.options.size),
+ waitTime)
+ log.info("Replacing rfile with empty")
+ out,err,code = self.shell(self.masterHost(), 'flush -t test_ingest\n')
+ self.processResult(out,err,code)
+ out,err = self.waitForStop(self.runOn(self.masterHost(), [self.accumulo_sh(), 'shell', '-u', ROOT, '-p', ROOT_PASSWORD, '-e', 'scan -t !METADATA']), waitTime)
+ self.failUnless(out.find("%s< file:/default_tablet/F0000000.rf" % self.getTableId('test_ingest')) >= 0,
+ "Test assumptions about the rfiles backing our test table are wrong. please file a bug.")
+ out,err,code = self.shell(self.masterHost(), 'offline -t test_ingest\n')
+ self.processResult(out,err,code)
+ import config
+ rfile = "%s/tables/%s/default_tablet/F0000000.rf" % (config.parse(SITE_PATH)['instance.dfs.dir'], self.getTableId('test_ingest'))
+ log.info("Removing rfile '%s'" % rfile)
+ self.waitForStop(self.runOn(self.masterHost(), ['hadoop', 'fs', '-rm', rfile]), waitTime)
+ self.waitForStop(self.runClassOn(self.masterHost(),
+ "org.apache.accumulo.core.file.rfile.CreateEmpty",
+ [rfile]),
+ waitTime)
+ log.info("Make sure we can still scan")
+ out,err,code = self.shell(self.masterHost(), 'online -t test_ingest\n')
+ self.processResult(out,err,code);
+ out,err = self.waitForStop(self.runOn(self.masterHost(), [self.accumulo_sh(), 'shell', '-u', ROOT, '-p', ROOT_PASSWORD, '-e', 'scan -t test_ingest']), waitTime)
+ self.failUnless(len(out) == 0)
+ self.shutdown_accumulo()
+
+def suite():
+ result = unittest.TestSuite()
+ result.addTest(RecoverWithEmptyTest())
+ return result