You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2014/04/22 23:08:42 UTC
[07/15] git commit: Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT
Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/101cd1fa
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/101cd1fa
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/101cd1fa
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 101cd1faad447c17191b203fb55096b8bd083ad0
Parents: 0aa1d03 5b32fd2
Author: Sean Busbey <bu...@cloudera.com>
Authored: Tue Apr 15 15:55:59 2014 -0500
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Tue Apr 22 14:36:24 2014 -0500
----------------------------------------------------------------------
.../accumulo/core/file/rfile/CreateEmpty.java | 72 +++++++++++++
.../core/file/rfile/RFileOperations.java | 6 +-
test/system/auto/simple/recoverWithEmpty.py | 104 +++++++++++++++++++
3 files changed, 180 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/101cd1fa/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
index 0000000,0000000..7663b2d
new file mode 100644
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
@@@ -1,0 -1,0 +1,72 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.core.file.rfile;
++
++import java.util.ArrayList;
++import java.util.Arrays;
++
++import org.apache.accumulo.core.conf.DefaultConfiguration;
++import org.apache.accumulo.core.file.FileSKVWriter;
++import org.apache.accumulo.core.file.rfile.RFile.Writer;
++import org.apache.accumulo.core.file.rfile.bcfile.TFile;
++import org.apache.accumulo.core.util.CachedConfiguration;
++import org.apache.commons.cli.BasicParser;
++import org.apache.commons.cli.CommandLine;
++import org.apache.commons.cli.HelpFormatter;
++import org.apache.commons.cli.Option;
++import org.apache.commons.cli.Options;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++
++/**
++ * Create an empty RFile for use in recovering from data loss where Accumulo still refers internally to a path.
++ */
++public class CreateEmpty {
++
++ public static void main(String[] args) throws Exception {
++ Configuration conf = CachedConfiguration.getInstance();
++
++ Options opts = new Options();
++ Option codecOption = new Option("c", "codec", true, "the compression codec to use. one of " + Arrays.toString(TFile.getSupportedCompressionAlgorithms()) + ". defaults to none.");
++ opts.addOption(codecOption);
++ Option help = new Option( "?", "help", false, "print this message" );
++ opts.addOption(help);
++
++ CommandLine commandLine = new BasicParser().parse(opts, args);
++ if (commandLine.hasOption(help.getOpt()) || 0 == commandLine.getArgs().length) {
++ HelpFormatter formatter = new HelpFormatter();
++ formatter.printHelp(120, "$ACCUMULO_HOME/bin/accumulo " + CreateEmpty.class.getName() + "[options] path [path ...]",
++ "", opts,
++ "each path given is a filesystem URL. Relative paths are resolved according to the default filesytem defined in your Hadoop configuration, which is usually an HDFS instance.");
++ }
++ String codec = commandLine.getOptionValue(codecOption.getOpt(), TFile.COMPRESSION_NONE);
++
++ for (String arg : commandLine.getArgs()) {
++ if (!arg.endsWith(".rf")) {
++ throw new IllegalArgumentException("File must end with .rf and '" + arg + "' does not.");
++ }
++ }
++
++ for (String arg : commandLine.getArgs()) {
++ Path path = new Path(arg);
++ FileSKVWriter writer = (new RFileOperations()).openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), codec);
++ writer.close();
++ }
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/101cd1fa/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 5374332,0000000..c906522
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@@ -1,128 -1,0 +1,130 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.RFile.Reader;
+import org.apache.accumulo.core.file.rfile.RFile.Writer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class RFileOperations extends FileOperations {
+
+ private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
+
+ @Override
+ public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+ return fs.getFileStatus(new Path(file)).getLen();
+ }
+
+ @Override
+ public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+
+ return openIndex(file, fs, conf, acuconf, null, null);
+ }
+
+ @Override
+ public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache)
+ throws IOException {
+ Path path = new Path(file);
+ // long len = fs.getFileStatus(path).getLen();
+ // FSDataInputStream in = fs.open(path);
+ // Reader reader = new RFile.Reader(in, len , conf);
+ CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache);
+ final Reader reader = new RFile.Reader(_cbr);
+
+ return reader.getIndex();
+ }
+
+ @Override
+ public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+ return openReader(file, seekToBeginning, fs, conf, acuconf, null, null);
+ }
+
+ @Override
+ public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
+ BlockCache dataCache, BlockCache indexCache) throws IOException {
+ Path path = new Path(file);
+
+ CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache);
+ Reader iter = new RFile.Reader(_cbr);
+
+ if (seekToBeginning) {
+ iter.seek(new Range((Key) null, null), EMPTY_CF_SET, false);
+ }
+
+ return iter;
+ }
+
+ @Override
+ public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
+ AccumuloConfiguration tableConf) throws IOException {
+ FileSKVIterator iter = openReader(file, false, fs, conf, tableConf, null, null);
+ iter.seek(range, columnFamilies, inclusive);
+ return iter;
+ }
+
+ @Override
+ public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
+ AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
+ FileSKVIterator iter = openReader(file, false, fs, conf, tableConf, dataCache, indexCache);
+ iter.seek(range, columnFamilies, inclusive);
+ return iter;
+ }
+
+ @Override
+ public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
++ return openWriter(file, fs, conf, acuconf, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
++ }
++
++ FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, String compression) throws IOException {
+ int hrep = conf.getInt("dfs.replication", -1);
+ int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
+ int rep = hrep;
+ if (trep > 0 && trep != hrep) {
+ rep = trep;
+ }
+ long hblock = conf.getLong("dfs.block.size", 1 << 26);
+ long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
+ long block = hblock;
+ if (tblock > 0)
+ block = tblock;
+ int bufferSize = conf.getInt("io.file.buffer.size", 4096);
+
+ long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
+ long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
+
- String compression = acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE);
-
+ CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf);
+ Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize);
+ return writer;
+ }
+}