You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bu...@apache.org on 2014/04/22 23:08:42 UTC

[07/15] git commit: Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT

Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/101cd1fa
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/101cd1fa
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/101cd1fa

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 101cd1faad447c17191b203fb55096b8bd083ad0
Parents: 0aa1d03 5b32fd2
Author: Sean Busbey <bu...@cloudera.com>
Authored: Tue Apr 15 15:55:59 2014 -0500
Committer: Sean Busbey <bu...@cloudera.com>
Committed: Tue Apr 22 14:36:24 2014 -0500

----------------------------------------------------------------------
 .../accumulo/core/file/rfile/CreateEmpty.java   |  72 +++++++++++++
 .../core/file/rfile/RFileOperations.java        |   6 +-
 test/system/auto/simple/recoverWithEmpty.py     | 104 +++++++++++++++++++
 3 files changed, 180 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/101cd1fa/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
index 0000000,0000000..7663b2d
new file mode 100644
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
@@@ -1,0 -1,0 +1,72 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.core.file.rfile;
++
++import java.util.ArrayList;
++import java.util.Arrays;
++
++import org.apache.accumulo.core.conf.DefaultConfiguration;
++import org.apache.accumulo.core.file.FileSKVWriter;
++import org.apache.accumulo.core.file.rfile.RFile.Writer;
++import org.apache.accumulo.core.file.rfile.bcfile.TFile;
++import org.apache.accumulo.core.util.CachedConfiguration;
++import org.apache.commons.cli.BasicParser;
++import org.apache.commons.cli.CommandLine;
++import org.apache.commons.cli.HelpFormatter;
++import org.apache.commons.cli.Option;
++import org.apache.commons.cli.Options;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++
++/**
++ * Create an empty RFile for use in recovering from data loss where Accumulo still refers internally to a path.
++ */
++public class CreateEmpty {
++
++  public static void main(String[] args) throws Exception {
++    Configuration conf = CachedConfiguration.getInstance();
++
++    Options opts = new Options();
++    Option codecOption = new Option("c", "codec", true, "the compression codec to use. one of " + Arrays.toString(TFile.getSupportedCompressionAlgorithms()) + ". defaults to none.");
++    opts.addOption(codecOption);
++    Option help = new Option( "?", "help", false, "print this message" );
++    opts.addOption(help);
++
++    CommandLine commandLine = new BasicParser().parse(opts, args);
++    if (commandLine.hasOption(help.getOpt()) || 0 == commandLine.getArgs().length) {
++      HelpFormatter formatter = new HelpFormatter();
++      formatter.printHelp(120, "$ACCUMULO_HOME/bin/accumulo " + CreateEmpty.class.getName() + "[options] path [path ...]",
++          "", opts,
++          "each path given is a filesystem URL. Relative paths are resolved according to the default filesytem defined in your Hadoop configuration, which is usually an HDFS instance.");
++    }
++    String codec = commandLine.getOptionValue(codecOption.getOpt(), TFile.COMPRESSION_NONE);
++
++    for (String arg : commandLine.getArgs()) {
++      if (!arg.endsWith(".rf")) {
++        throw new IllegalArgumentException("File must end with .rf and '" + arg + "' does not.");
++      }
++    }
++
++    for (String arg : commandLine.getArgs()) {
++      Path path = new Path(arg);
++      FileSKVWriter writer = (new RFileOperations()).openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), codec);
++      writer.close();
++    }
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/101cd1fa/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 5374332,0000000..c906522
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@@ -1,128 -1,0 +1,130 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.file.rfile;
 +
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.file.FileSKVIterator;
 +import org.apache.accumulo.core.file.FileSKVWriter;
 +import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 +import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 +import org.apache.accumulo.core.file.rfile.RFile.Reader;
 +import org.apache.accumulo.core.file.rfile.RFile.Writer;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +
 +public class RFileOperations extends FileOperations {
 +  
 +  private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
 +  
 +  @Override
 +  public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
 +    return fs.getFileStatus(new Path(file)).getLen();
 +  }
 +  
 +  @Override
 +  public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
 +    
 +    return openIndex(file, fs, conf, acuconf, null, null);
 +  }
 +  
 +  @Override
 +  public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache)
 +      throws IOException {
 +    Path path = new Path(file);
 +    // long len = fs.getFileStatus(path).getLen();
 +    // FSDataInputStream in = fs.open(path);
 +    // Reader reader = new RFile.Reader(in, len , conf);
 +    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache);
 +    final Reader reader = new RFile.Reader(_cbr);
 +    
 +    return reader.getIndex();
 +  }
 +  
 +  @Override
 +  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
 +    return openReader(file, seekToBeginning, fs, conf, acuconf, null, null);
 +  }
 +  
 +  @Override
 +  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
 +      BlockCache dataCache, BlockCache indexCache) throws IOException {
 +    Path path = new Path(file);
 +    
 +    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache);
 +    Reader iter = new RFile.Reader(_cbr);
 +    
 +    if (seekToBeginning) {
 +      iter.seek(new Range((Key) null, null), EMPTY_CF_SET, false);
 +    }
 +    
 +    return iter;
 +  }
 +  
 +  @Override
 +  public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
 +      AccumuloConfiguration tableConf) throws IOException {
 +    FileSKVIterator iter = openReader(file, false, fs, conf, tableConf, null, null);
 +    iter.seek(range, columnFamilies, inclusive);
 +    return iter;
 +  }
 +  
 +  @Override
 +  public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
 +      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
 +    FileSKVIterator iter = openReader(file, false, fs, conf, tableConf, dataCache, indexCache);
 +    iter.seek(range, columnFamilies, inclusive);
 +    return iter;
 +  }
 +  
 +  @Override
 +  public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
++    return openWriter(file, fs, conf, acuconf, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
++  }
++
++  FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, String compression) throws IOException {
 +    int hrep = conf.getInt("dfs.replication", -1);
 +    int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
 +    int rep = hrep;
 +    if (trep > 0 && trep != hrep) {
 +      rep = trep;
 +    }
 +    long hblock = conf.getLong("dfs.block.size", 1 << 26);
 +    long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
 +    long block = hblock;
 +    if (tblock > 0)
 +      block = tblock;
 +    int bufferSize = conf.getInt("io.file.buffer.size", 4096);
 +    
 +    long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
 +    long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
 +    
-     String compression = acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE);
-     
 +    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf);
 +    Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize);
 +    return writer;
 +  }
 +}