You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2016/06/02 15:16:20 UTC
[2/3] accumulo git commit: ACCUMULO-4165 Added a user facing API for
RFile
ACCUMULO-4165 Added a user facing API for RFile
Squashed commit of the following:
commit c96cda97507fc611bfdf1699ea022769072adc55
Author: Keith Turner <kt...@apache.org>
Date: Tue May 31 17:53:20 2016 -0400
Multiple improvements based on code review.
* Made AccumuloFileOutputFormat use new RFile public API.
* Moved checking of visibility vallidity to within new RFile API impl
* Added test for error conditions and documented expected exceptions.
commit 005ebb2c0e47e4cf818c955f292c71696a4fff41
Author: Keith Turner <kt...@apache.org>
Date: Tue May 31 14:35:18 2016 -0400
updates based on code review comments
commit a5c6ece070fc44923758a1da4aa50849f872fdf4
Author: Keith Turner <kt...@apache.org>
Date: Fri May 27 15:41:06 2016 -0400
added a test
commit 911c64cd714364707e1258dcf627b151630a18bf
Author: Keith Turner <kt...@apache.org>
Date: Fri May 27 14:38:28 2016 -0400
ACCUMULO-4165 Added a user facing API for RFile
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/61a7de4a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/61a7de4a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/61a7de4a
Branch: refs/heads/master
Commit: 61a7de4a167571bea16f2f130ddaaa8768562148
Parents: c8621bb
Author: Keith Turner <ke...@deenlo.com>
Authored: Thu Jun 2 11:03:37 2016 -0400
Committer: Keith Turner <ke...@deenlo.com>
Committed: Thu Jun 2 11:03:37 2016 -0400
----------------------------------------------------------------------
.../client/admin/NewTableConfiguration.java | 14 +-
.../core/client/admin/TableOperations.java | 4 +-
.../core/client/impl/OfflineIterator.java | 14 +-
.../client/mapred/AccumuloFileOutputFormat.java | 25 +-
.../mapreduce/AccumuloFileOutputFormat.java | 27 +-
.../accumulo/core/client/rfile/FSConfArgs.java | 47 ++
.../accumulo/core/client/rfile/RFile.java | 275 ++++++++
.../core/client/rfile/RFileScanner.java | 330 ++++++++++
.../core/client/rfile/RFileScannerBuilder.java | 148 +++++
.../accumulo/core/client/rfile/RFileSource.java | 44 ++
.../accumulo/core/client/rfile/RFileWriter.java | 236 +++++++
.../core/client/rfile/RFileWriterBuilder.java | 148 +++++
.../accumulo/core/client/sample/Sampler.java | 1 -
.../accumulo/core/file/FileOperations.java | 59 +-
.../apache/accumulo/core/file/rfile/RFile.java | 2 +-
.../core/file/rfile/RFileOperations.java | 45 +-
.../accumulo/core/iterators/IteratorUtil.java | 15 +
.../sample/impl/SamplerConfigurationImpl.java | 12 +
.../accumulo/core/util/LocalityGroupUtil.java | 2 +
.../accumulo/core/client/rfile/RFileTest.java | 647 +++++++++++++++++++
.../accumulo/core/file/rfile/RFileTest.java | 10 +-
.../accumulo/tserver/tablet/ScanDataSource.java | 13 +-
22 files changed, 2012 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
index 994b653..e7dc898 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/NewTableConfiguration.java
@@ -88,7 +88,7 @@ public class NewTableConfiguration {
*/
public NewTableConfiguration setProperties(Map<String,String> prop) {
checkArgument(prop != null, "properties is null");
- checkDisjoint(prop, samplerConfiguration);
+ SamplerConfigurationImpl.checkDisjoint(prop, samplerConfiguration);
this.properties = new HashMap<String,String>(prop);
return this;
@@ -114,16 +114,6 @@ public class NewTableConfiguration {
return Collections.unmodifiableMap(propertyMap);
}
- private void checkDisjoint(Map<String,String> props, SamplerConfiguration samplerConfiguration) {
- if (props.isEmpty() || samplerConfiguration == null) {
- return;
- }
-
- Map<String,String> sampleProps = new SamplerConfigurationImpl(samplerConfiguration).toTablePropertiesMap();
-
- checkArgument(Collections.disjoint(props.keySet(), sampleProps.keySet()), "Properties and derived sampler properties are not disjoint");
- }
-
/**
* Enable building a sample data set on the new table using the given sampler configuration.
*
@@ -131,7 +121,7 @@ public class NewTableConfiguration {
*/
public NewTableConfiguration enableSampling(SamplerConfiguration samplerConfiguration) {
requireNonNull(samplerConfiguration);
- checkDisjoint(properties, samplerConfiguration);
+ SamplerConfigurationImpl.checkDisjoint(properties, samplerConfiguration);
this.samplerConfiguration = samplerConfiguration;
return this;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
index f292902..3e56736 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
@@ -31,6 +31,8 @@ import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
+import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
@@ -538,7 +540,7 @@ public interface TableOperations {
Set<Range> splitRangeByTablets(String tableName, Range range, int maxSplits) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
/**
- * Bulk import all the files in a directory into a table.
+ * Bulk import all the files in a directory into a table. Files can be created using {@link AccumuloFileOutputFormat} and {@link RFile#newWriter()}
*
* @param tableName
* the name of the table
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
index c5017c3..54fe4ff 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
@@ -52,11 +52,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
-import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
-import org.apache.accumulo.core.iterators.system.DeletingIterator;
import org.apache.accumulo.core.iterators.system.MultiIterator;
-import org.apache.accumulo.core.iterators.system.VisibilityFilter;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
@@ -359,18 +355,12 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment(authorizations, acuTableConf, false, samplerConfImpl == null ? null
: samplerConfImpl.toSamplerConfiguration());
- DeletingIterator delIter = new DeletingIterator(multiIter, false);
-
- ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
-
- ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, new HashSet<Column>(options.fetchedColumns));
-
byte[] defaultSecurityLabel;
-
ColumnVisibility cv = new ColumnVisibility(acuTableConf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
defaultSecurityLabel = cv.getExpression();
- VisibilityFilter visFilter = new VisibilityFilter(colFilter, authorizations, defaultSecurityLabel);
+ SortedKeyValueIterator<Key,Value> visFilter = IteratorUtil.setupSystemScanIterators(multiIter, new HashSet<Column>(options.fetchedColumns), authorizations,
+ defaultSecurityLabel);
return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, extent, acuTableConf, options.serverSideIteratorList,
options.serverSideIteratorOptions, iterEnv, false));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
index 1e90e27..f2bc4cd 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
@@ -17,20 +17,16 @@
package org.apache.accumulo.core.client.mapred;
import java.io.IOException;
-import java.util.Arrays;
import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.client.rfile.RFileWriter;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.commons.collections.map.LRUMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -163,11 +159,10 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
final String extension = acuConf.get(Property.TABLE_FILE_TYPE);
final Path file = new Path(getWorkOutputPath(job), getUniqueName(job, "part") + "." + extension);
-
- final LRUMap validVisibilities = new LRUMap(ConfiguratorBase.getVisibilityCacheSize(conf));
+ final int visCacheSize = ConfiguratorBase.getVisibilityCacheSize(conf);
return new RecordWriter<Key,Value>() {
- FileSKVWriter out = null;
+ RFileWriter out = null;
@Override
public void close(Reporter reporter) throws IOException {
@@ -177,17 +172,9 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
@Override
public void write(Key key, Value value) throws IOException {
-
- Boolean wasChecked = (Boolean) validVisibilities.get(key.getColumnVisibilityData());
- if (wasChecked == null) {
- byte[] cv = key.getColumnVisibilityData().toArray();
- new ColumnVisibility(cv);
- validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE);
- }
-
if (out == null) {
- out = FileOperations.getInstance().newWriterBuilder().forFile(file.toString(), file.getFileSystem(conf), conf).withTableConfiguration(acuConf)
- .build();
+ out = RFile.newWriter().to(file.toString()).withFileSystem(file.getFileSystem(conf)).withTableProperties(acuConf)
+ .withVisibilityCacheSize(visCacheSize).build();
out.startDefaultLocalityGroup();
}
out.append(key, value);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
index b337f56..75afe2b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
@@ -17,19 +17,16 @@
package org.apache.accumulo.core.client.mapreduce;
import java.io.IOException;
-import java.util.Arrays;
+import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.client.rfile.RFileWriter;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.commons.collections.map.LRUMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
@@ -161,11 +158,10 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
final String extension = acuConf.get(Property.TABLE_FILE_TYPE);
final Path file = this.getDefaultWorkFile(context, "." + extension);
-
- final LRUMap validVisibilities = new LRUMap(1000);
+ final int visCacheSize = ConfiguratorBase.getVisibilityCacheSize(conf);
return new RecordWriter<Key,Value>() {
- FileSKVWriter out = null;
+ RFileWriter out = null;
@Override
public void close(TaskAttemptContext context) throws IOException {
@@ -175,22 +171,13 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
@Override
public void write(Key key, Value value) throws IOException {
-
- Boolean wasChecked = (Boolean) validVisibilities.get(key.getColumnVisibilityData());
- if (wasChecked == null) {
- byte[] cv = key.getColumnVisibilityData().toArray();
- new ColumnVisibility(cv);
- validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE);
- }
-
if (out == null) {
- out = FileOperations.getInstance().newWriterBuilder().forFile(file.toString(), file.getFileSystem(conf), conf).withTableConfiguration(acuConf)
- .build();
+ out = RFile.newWriter().to(file.toString()).withFileSystem(file.getFileSystem(conf)).withTableProperties(acuConf)
+ .withVisibilityCacheSize(visCacheSize).build();
out.startDefaultLocalityGroup();
}
out.append(key, value);
}
};
}
-
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java
new file mode 100644
index 0000000..1679e43
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/FSConfArgs.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+class FSConfArgs {
+
+ FileSystem fs;
+ Configuration conf;
+
+ FileSystem getFileSystem() throws IOException {
+ if (fs == null) {
+ fs = FileSystem.get(getConf());
+ }
+ return fs;
+ }
+
+ Configuration getConf() throws IOException {
+ if (fs != null) {
+ return fs.getConf();
+ }
+
+ if (conf == null) {
+ conf = new Configuration();
+ }
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
new file mode 100644
index 0000000..bc5995e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+
+/**
+ * RFile is Accumulo's internal storage format for Key Value pairs. This class is a Factory that enables creating a {@link Scanner} for reading and a
+ * {@link RFileWriter} for writing Rfiles.
+ *
+ * <p>
+ * The {@link Scanner} created by this class makes it easy to experiment with real data from a live system on a developers workstation. Also the {@link Scanner}
+ * can be used to write tools to analyze Accumulo's raw data.
+ *
+ * @since 1.8.0
+ */
+public class RFile {
+
+ /**
+ * This is an intermediate interface in a larger builder pattern. Supports setting the required input sources for reading a RFile.
+ *
+ * @since 1.8.0
+ */
+ public static interface InputArguments {
+ /**
+ * Specify RFiles to read from. When multiple inputs are specified the {@link Scanner} constructed will present a merged view.
+ *
+ * @param inputs
+ * one or more RFiles to read.
+ * @return this
+ */
+ ScannerOptions from(RFileSource... inputs);
+
+ /**
+ * Specify RFiles to read from. When multiple are specified the {@link Scanner} constructed will present a merged view.
+ *
+ * @param files
+ * one or more RFiles to read.
+ * @return this
+ */
+ ScannerFSOptions from(String... files);
+ }
+
+ /**
+ * This is an intermediate interface in a larger builder pattern. Enables optionally setting a FileSystem to read RFile(s) from.
+ *
+ * @since 1.8.0
+ */
+ public static interface ScannerFSOptions extends ScannerOptions {
+ /**
+ * Optionally provide a FileSystem to open RFiles. If not specified, the FileSystem will be constructed using configuration on the classpath.
+ *
+ * @param fs
+ * use this FileSystem to open files.
+ * @return this
+ */
+ ScannerOptions withFileSystem(FileSystem fs);
+ }
+
+ /**
+ * This is an intermediate interface in a larger builder pattern. Supports setting optional parameters for reading RFile(s) and building a scanner over
+ * RFile(s).
+ *
+ * @since 1.8.0
+ */
+ public static interface ScannerOptions {
+
+ /**
+ * By default the {@link Scanner} created will setup the default Accumulo system iterators. The iterators do things like the following :
+ *
+ * <ul>
+ * <li>Suppress deleted data</li>
+ * <li>Filter based on @link {@link Authorizations}</li>
+ * <li>Filter columns specified by functions like {@link Scanner#fetchColumn(Text, Text)} and {@link Scanner#fetchColumnFamily(Text)}</li>
+ * </ul>
+ *
+ * <p>
+ * Calling this method will turn off these system iterators and allow reading the raw data in an RFile. When reading the raw data, delete data and delete
+ * markers may be seen. Delete markers are {@link Key}s with the delete flag set.
+ *
+ * <p>
+ * Disabling system iterators will cause {@link #withAuthorizations(Authorizations)}, {@link Scanner#fetchColumn(Text, Text)}, and
+ * {@link Scanner#fetchColumnFamily(Text)} to throw runtime exceptions.
+ *
+ * @return this
+ */
+ public ScannerOptions withoutSystemIterators();
+
+ /**
+ * The authorizations passed here will be used to filter Keys, from the {@link Scanner}, based on the content of the column visibility field.
+ *
+ * @param auths
+ * scan with these authorizations
+ * @return this
+ */
+ public ScannerOptions withAuthorizations(Authorizations auths);
+
+ /**
+ * Enabling this option will cache RFiles data in memory. This option is useful when doing lots of random accesses.
+ *
+ * @param cacheSize
+ * the size of the data cache in bytes.
+ * @return this
+ */
+ public ScannerOptions withDataCache(long cacheSize);
+
+ /**
+ * Enabling this option will cache RFiles indexes in memory. Index data within a RFile is used to find data when seeking to a {@link Key}. This option is
+ * useful when doing lots of random accesses.
+ *
+ * @param cacheSize
+ * the size of the index cache in bytes.
+ * @return this
+ */
+ public ScannerOptions withIndexCache(long cacheSize);
+
+ /**
+ * This option allows limiting the {@link Scanner} from reading data outside of a given range. A scanner will not see any data outside of this range even if
+ * the RFile(s) have data outside the range.
+ *
+ * @return this
+ */
+ public ScannerOptions withBounds(Range range);
+
+ /**
+ * Construct the {@link Scanner} with iterators specified in a tables properties. Properties for a table can be obtained by calling
+ * {@link TableOperations#getProperties(String)}
+ *
+ * @param props
+ * iterable over Accumulo table key value properties.
+ * @return this
+ */
+ public ScannerOptions withTableProperties(Iterable<Entry<String,String>> props);
+
+ /**
+ * @see #withTableProperties(Iterable)
+ * @param props
+ * a map instead of an Iterable
+ * @return this
+ */
+ public ScannerOptions withTableProperties(Map<String,String> props);
+
+ /**
+ * @return a Scanner over RFile using the specified options.
+ */
+ public Scanner build();
+ }
+
+ /**
+ * Entry point for building a new {@link Scanner} over one or more RFiles.
+ */
+ public static InputArguments newScanner() {
+ return new RFileScannerBuilder();
+ }
+
+ /**
+ * This is an intermediate interface in a larger builder pattern. Supports setting the required output sink to write a RFile to.
+ *
+ * @since 1.8.0
+ */
+ public static interface OutputArguments {
+ /**
+ * @param filename
+ * name of file to write RFile data
+ * @return this
+ */
+ public WriterFSOptions to(String filename);
+
+ /**
+ * @param out
+ * output stream to write RFile data
+ * @return this
+ */
+ public WriterOptions to(OutputStream out);
+ }
+
+ /**
+ * This is an intermediate interface in a larger builder pattern. Enables optionally setting a FileSystem to write to.
+ *
+ * @since 1.8.0
+ */
+ public static interface WriterFSOptions extends WriterOptions {
+ /**
+ * Optionally provide a FileSystem to open a file to write a RFile. If not specified, the FileSystem will be constructed using configuration on the
+ * classpath.
+ *
+ * @param fs
+ * use this FileSystem to open files.
+ * @return this
+ */
+ WriterOptions withFileSystem(FileSystem fs);
+ }
+
+ /**
+ * This is an intermediate interface in a larger builder pattern. Supports setting optional parameters for creating a RFile and building a RFileWriter.
+ *
+ * @since 1.8.0
+ */
+ public static interface WriterOptions {
+ /**
+ * An option to store sample data in the generated RFile.
+ *
+ * @param samplerConf
+ * configuration to use when generating sample data.
+ * @throws IllegalArgumentException
+ * if table properties were previously specified and the table properties also specify a sampler.
+ * @return this
+ */
+ public WriterOptions withSampler(SamplerConfiguration samplerConf);
+
+ /**
+ * Create an RFile using the same configuration as an Accumulo table. Properties for a table can be obtained by calling
+ * {@link TableOperations#getProperties(String)}
+ *
+ * @param props
+ * iterable over Accumulo table key value properties.
+ * @throws IllegalArgumentException
+ * if sampler was previously specified and the table properties also specify a sampler.
+ * @return this
+ */
+ public WriterOptions withTableProperties(Iterable<Entry<String,String>> props);
+
+ /**
+ * @see #withTableProperties(Iterable)
+ */
+ public WriterOptions withTableProperties(Map<String,String> props);
+
+ /**
+ * @param maxSize
+ * As keys are added to an RFile the visibility field is validated. Validating the visibility field requires parsing it. In order to make
+ * validation faster, previously seen visibilities are cached. This option allows setting the maximum size of this cache.
+ * @return this
+ */
+ public WriterOptions withVisibilityCacheSize(int maxSize);
+
+ /**
+ * @return a new RfileWriter created with the options previously specified.
+ */
+ public RFileWriter build() throws IOException;
+ }
+
+ /**
+ * Entry point for creating a new RFile writer.
+ */
+ public static OutputArguments newWriter() {
+ return new RFileWriterBuilder();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
new file mode 100644
index 0000000..4dfba68
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.BaseIteratorEnvironment;
+import org.apache.accumulo.core.client.impl.ScannerOptions;
+import org.apache.accumulo.core.client.rfile.RFileScannerBuilder.InputArgs;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Column;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFile.Reader;
+import org.apache.accumulo.core.iterators.IteratorAdapter;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+
+class RFileScanner extends ScannerOptions implements Scanner {
+
+ private static final byte[] EMPTY_BYTES = new byte[0];
+ private static final Range EMPTY_RANGE = new Range();
+
+ private Range range;
+ private BlockCache dataCache = null;
+ private BlockCache indexCache = null;
+ private Opts opts;
+ private int batchSize = 1000;
+ private long readaheadThreshold = 3;
+
+ private static final long CACHE_BLOCK_SIZE = AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE);
+
+ static class Opts {
+ InputArgs in;
+ Authorizations auths = Authorizations.EMPTY;
+ long dataCacheSize;
+ long indexCacheSize;
+ boolean useSystemIterators = true;
+ public HashMap<String,String> tableConfig;
+ Range bounds;
+ }
+
+ // This cache exist as a hack to avoid leaking decompressors. When the RFile code is not given a
+ // cache it reads blocks directly from the decompressor. However if a user does not read all data
+ // for a scan this can leave a BCFile block open and a decompressor allocated.
+ //
+ // By providing a cache to the RFile code it forces each block to be read into memory. When a
+ // block is accessed the entire thing is read into memory immediately allocating and deallocating
+ // a decompressor. If the user does not read all data, no decompressors are left allocated.
+ private static class NoopCache implements BlockCache {
+ @Override
+ public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) {
+ return null;
+ }
+
+ @Override
+ public CacheEntry cacheBlock(String blockName, byte[] buf) {
+ return null;
+ }
+
+ @Override
+ public CacheEntry getBlock(String blockName) {
+ return null;
+ }
+
+ @Override
+ public long getMaxSize() {
+ return Integer.MAX_VALUE;
+ }
+ }
+
+ RFileScanner(Opts opts) {
+ if (!opts.auths.equals(Authorizations.EMPTY) && !opts.useSystemIterators) {
+ throw new IllegalArgumentException("Set authorizations and specified not to use system iterators");
+ }
+
+ this.opts = opts;
+ if (opts.indexCacheSize > 0) {
+ this.indexCache = new LruBlockCache(opts.indexCacheSize, CACHE_BLOCK_SIZE);
+ } else {
+ this.indexCache = new NoopCache();
+ }
+
+ if (opts.dataCacheSize > 0) {
+ this.dataCache = new LruBlockCache(opts.dataCacheSize, CACHE_BLOCK_SIZE);
+ } else {
+ this.dataCache = new NoopCache();
+ }
+ }
+
+ @Override
+ public synchronized void fetchColumnFamily(Text col) {
+ Preconditions.checkArgument(opts.useSystemIterators, "Can only fetch columns when using system iterators");
+ super.fetchColumnFamily(col);
+ }
+
+ @Override
+ public synchronized void fetchColumn(Text colFam, Text colQual) {
+ Preconditions.checkArgument(opts.useSystemIterators, "Can only fetch columns when using system iterators");
+ super.fetchColumn(colFam, colQual);
+ }
+
+ @Override
+ public void fetchColumn(IteratorSetting.Column column) {
+ Preconditions.checkArgument(opts.useSystemIterators, "Can only fetch columns when using system iterators");
+ super.fetchColumn(column);
+ }
+
+ @Override
+ public void setClassLoaderContext(String classLoaderContext) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated
+ @Override
+ public void setTimeOut(int timeOut) {
+ if (timeOut == Integer.MAX_VALUE)
+ setTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ else
+ setTimeout(timeOut, TimeUnit.SECONDS);
+ }
+
+ @Deprecated
+ @Override
+ public int getTimeOut() {
+ long timeout = getTimeout(TimeUnit.SECONDS);
+ if (timeout >= Integer.MAX_VALUE)
+ return Integer.MAX_VALUE;
+ return (int) timeout;
+ }
+
+ @Override
+ public void setRange(Range range) {
+ this.range = range;
+ }
+
+ @Override
+ public Range getRange() {
+ return range;
+ }
+
+ @Override
+ public void setBatchSize(int size) {
+ this.batchSize = size;
+ }
+
+ @Override
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ @Override
+ public void enableIsolation() {}
+
+ @Override
+ public void disableIsolation() {}
+
+ @Override
+ public synchronized void setReadaheadThreshold(long batches) {
+ Preconditions.checkArgument(batches > 0);
+ readaheadThreshold = batches;
+ }
+
+ @Override
+ public synchronized long getReadaheadThreshold() {
+ return readaheadThreshold;
+ }
+
+ @Override
+ public Authorizations getAuthorizations() {
+ return opts.auths;
+ }
+
+ @Override
+ public void addScanIterator(IteratorSetting cfg) {
+ super.addScanIterator(cfg);
+ }
+
+ @Override
+ public void removeScanIterator(String iteratorName) {
+ super.removeScanIterator(iteratorName);
+ }
+
+ @Override
+ public void updateScanIteratorOption(String iteratorName, String key, String value) {
+ super.updateScanIteratorOption(iteratorName, key, value);
+ }
+
+ private class IterEnv extends BaseIteratorEnvironment {
+ @Override
+ public IteratorScope getIteratorScope() {
+ return IteratorScope.scan;
+ }
+
+ @Override
+ public boolean isFullMajorCompaction() {
+ return false;
+ }
+
+ @Override
+ public Authorizations getAuthorizations() {
+ return opts.auths;
+ }
+
+ @Override
+ public boolean isSamplingEnabled() {
+ return RFileScanner.this.getSamplerConfiguration() != null;
+ }
+
+ @Override
+ public SamplerConfiguration getSamplerConfiguration() {
+ return RFileScanner.this.getSamplerConfiguration();
+ }
+ }
+
+ @Override
+ public Iterator<Entry<Key,Value>> iterator() {
+ try {
+ RFileSource[] sources = opts.in.getSources();
+ List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(sources.length);
+ for (int i = 0; i < sources.length; i++) {
+ FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream();
+ readers.add(new RFile.Reader(new CachableBlockFile.Reader(inputStream, sources[i].getLength(), opts.in.getConf(), dataCache, indexCache,
+ AccumuloConfiguration.getDefaultConfiguration())));
+ }
+
+ if (getSamplerConfiguration() != null) {
+ for (int i = 0; i < readers.size(); i++) {
+ readers.set(i, ((Reader) readers.get(i)).getSample(new SamplerConfigurationImpl(getSamplerConfiguration())));
+ }
+ }
+
+ SortedKeyValueIterator<Key,Value> iterator;
+ if (opts.bounds != null) {
+ iterator = new MultiIterator(readers, opts.bounds);
+ } else {
+ iterator = new MultiIterator(readers, false);
+ }
+
+ Set<ByteSequence> families = Collections.emptySet();
+
+ if (opts.useSystemIterators) {
+ SortedSet<Column> cols = this.getFetchedColumns();
+ families = LocalityGroupUtil.families(cols);
+ iterator = IteratorUtil.setupSystemScanIterators(iterator, cols, getAuthorizations(), EMPTY_BYTES);
+ }
+
+ try {
+ if (opts.tableConfig != null && opts.tableConfig.size() > 0) {
+ ConfigurationCopy conf = new ConfigurationCopy(opts.tableConfig);
+ iterator = IteratorUtil.loadIterators(IteratorScope.scan, iterator, null, conf, serverSideIteratorList, serverSideIteratorOptions, new IterEnv());
+ } else {
+ iterator = IteratorUtil.loadIterators(iterator, serverSideIteratorList, serverSideIteratorOptions, new IterEnv(), false, null);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ iterator.seek(getRange() == null ? EMPTY_RANGE : getRange(), families, families.size() == 0 ? false : true);
+ return new IteratorAdapter(iterator);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (dataCache instanceof LruBlockCache) {
+ ((LruBlockCache) dataCache).shutdown();
+ }
+
+ if (indexCache instanceof LruBlockCache) {
+ ((LruBlockCache) indexCache).shutdown();
+ }
+
+ try {
+ for (RFileSource source : opts.in.getSources()) {
+ source.getInputStream().close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
new file mode 100644
index 0000000..92e07b4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScannerBuilder.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.rfile.RFile.ScannerFSOptions;
+import org.apache.accumulo.core.client.rfile.RFile.ScannerOptions;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+
+class RFileScannerBuilder implements RFile.InputArguments, RFile.ScannerFSOptions, RFile.ScannerOptions {
+
+ static class InputArgs extends FSConfArgs {
+ private Path[] paths;
+ private RFileSource[] sources;
+
+ InputArgs(String... files) {
+ this.paths = new Path[files.length];
+ for (int i = 0; i < files.length; i++) {
+ this.paths[i] = new Path(files[i]);
+ }
+ }
+
+ InputArgs(RFileSource... sources) {
+ this.sources = sources;
+ }
+
+ RFileSource[] getSources() throws IOException {
+ if (sources == null) {
+ sources = new RFileSource[paths.length];
+ for (int i = 0; i < paths.length; i++) {
+ sources[i] = new RFileSource(getFileSystem().open(paths[i]), getFileSystem().getFileStatus(paths[i]).getLen());
+ }
+ } else {
+ for (int i = 0; i < sources.length; i++) {
+ if (!(sources[i].getInputStream() instanceof FSDataInputStream)) {
+ sources[i] = new RFileSource(new FSDataInputStream(sources[i].getInputStream()), sources[i].getLength());
+ }
+ }
+ }
+
+ return sources;
+ }
+ }
+
+ private RFileScanner.Opts opts = new RFileScanner.Opts();
+
+ @Override
+ public ScannerOptions withoutSystemIterators() {
+ opts.useSystemIterators = false;
+ return this;
+ }
+
+ @Override
+ public ScannerOptions withAuthorizations(Authorizations auths) {
+ Objects.requireNonNull(auths);
+ opts.auths = auths;
+ return this;
+ }
+
+ @Override
+ public ScannerOptions withDataCache(long cacheSize) {
+ Preconditions.checkArgument(cacheSize > 0);
+ opts.dataCacheSize = cacheSize;
+ return this;
+ }
+
+ @Override
+ public ScannerOptions withIndexCache(long cacheSize) {
+ Preconditions.checkArgument(cacheSize > 0);
+ opts.indexCacheSize = cacheSize;
+ return this;
+ }
+
+ @Override
+ public Scanner build() {
+ return new RFileScanner(opts);
+ }
+
+ @Override
+ public ScannerOptions withFileSystem(FileSystem fs) {
+ Objects.requireNonNull(fs);
+ opts.in.fs = fs;
+ return this;
+ }
+
+ @Override
+ public ScannerOptions from(RFileSource... inputs) {
+ opts.in = new InputArgs(inputs);
+ return this;
+ }
+
+ @Override
+ public ScannerFSOptions from(String... files) {
+ opts.in = new InputArgs(files);
+ return this;
+ }
+
+ @Override
+ public ScannerOptions withTableProperties(Iterable<Entry<String,String>> tableConfig) {
+ Objects.requireNonNull(tableConfig);
+ this.opts.tableConfig = new HashMap<>();
+ for (Entry<String,String> entry : tableConfig) {
+ this.opts.tableConfig.put(entry.getKey(), entry.getValue());
+ }
+ return this;
+ }
+
+ @Override
+ public ScannerOptions withTableProperties(Map<String,String> tableConfig) {
+ Objects.requireNonNull(tableConfig);
+ this.opts.tableConfig = new HashMap<>(tableConfig);
+ return this;
+ }
+
+ @Override
+ public ScannerOptions withBounds(Range range) {
+ Objects.requireNonNull(range);
+ this.opts.bounds = range;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSource.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSource.java
new file mode 100644
index 0000000..21298c3
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSource.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.InputStream;
+
+/**
+ * RFile metadata is stored at the end of the file. Inorder to read an RFile, its length must be known. This provides a way to pass an InputStream and length
+ * for reading an RFile.
+ *
+ * @since 1.8.0
+ */
+public class RFileSource {
+ private final InputStream in;
+ private final long len;
+
+ public RFileSource(InputStream in, long len) {
+ this.in = in;
+ this.len = len;
+ }
+
+ public InputStream getInputStream() {
+ return in;
+ }
+
+ public long getLength() {
+ return len;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
new file mode 100644
index 0000000..aad4908
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.commons.collections.map.LRUMap;
+
+import com.google.common.base.Preconditions;
+
+//formatter was adding spaced that checkstyle did not like, so turned off formatter
+//@formatter:off
+/**
+ * This class provides an API for writing RFiles. It can be used to create file for bulk import into Accumulo using
+ * {@link TableOperations#importDirectory(String, String, String, boolean)}
+ *
+ * <p>
+ * A RFileWriter has the following constraints. Violating these contraints will result in runtime exceptions.
+ *
+ * <ul>
+ * <li>Before appending any keys, a locality group must be started by calling one of the startNewLocalityGroup functions or startDefaultLocalityGroup.</li>
+ * <li>Keys must be appended in sorted order within a locality group.</li>
+ * <li>Locality groups must have a mutually exclusive set of column families.</li>
+ * <li>The default locality group must be started last.</li>
+ * </ul>
+ *
+ *
+ * <p>
+ * Below is an example of using RFileWriter
+ *
+ * <p>
+ *
+ * <pre>
+ * {@code
+ * Iterable<Entry<Key, Value>> localityGroup1Data = ...
+ * Iterable<Entry<Key, Value>> localityGroup2Data = ...
+ * Iterable<Entry<Key, Value>> defaultGroupData = ...
+ *
+ * try(RFileWriter writer = RFile.newWriter().to(file).build()){
+ *
+ * //Start a locality group before appending data.
+ * writer.startNewLocalityGroup("groupA", "columnFam1", "columnFam2");
+ * //Append data to the locality group that was started above. Must append in sorted order.
+ * writer.append(localityGroup1Data);
+ *
+ * //Add another locality group.
+ * writer.startNewLocalityGroup("groupB", "columnFam3", "columnFam4");
+ * writer.append(localityGroup2Data);
+ *
+ * //The default locality group must be started last. The column families for the default group do not need to be specified.
+ * writer.startDefaultLocalityGroup();
+ * //Data appended here can not contain any column families specified in previous locality groups.
+ * writer.append(defaultGroupData);
+ *
+ * //This is a try-with-resources so the writer is closed here at the end of the code block.
+ * }
+ * }
+ * </pre>
+ *
+ * <p>
+ * Create instances by calling {@link RFile#newWriter()}
+ *
+ * @since 1.8.0
+ */
+// @formatter:on
+public class RFileWriter implements AutoCloseable {
+
+ private FileSKVWriter writer;
+ private final LRUMap validVisibilities;
+ private boolean startedLG;
+ private boolean startedDefaultLG;
+
+ RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize) {
+ this.writer = fileSKVWriter;
+ this.validVisibilities = new LRUMap(visCacheSize);
+ }
+
+ private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {
+ Preconditions.checkState(!startedDefaultLG, "Cannont start a locality group after starting the default locality group");
+ writer.startNewLocalityGroup(name, columnFamilies);
+ startedLG = true;
+ }
+
+ /**
+ * Before appending any data, a locality group must be started. The default locality group must be started last.
+ *
+ * @param name
+ * locality group name, used for informational purposes
+ * @param families
+ * the column families the locality group can contain
+ *
+ * @throws IllegalStateException
+ * When default locality group already started.
+ */
+ public void startNewLocalityGroup(String name, List<byte[]> families) throws IOException {
+ HashSet<ByteSequence> fams = new HashSet<ByteSequence>();
+ for (byte[] family : families) {
+ fams.add(new ArrayByteSequence(family));
+ }
+ _startNewLocalityGroup(name, fams);
+ }
+
+ /**
+ * See have doc for {@link #startNewLocalityGroup(String, List)}
+ */
+ public void startNewLocalityGroup(String name, byte[]... families) throws IOException {
+ startNewLocalityGroup(name, Arrays.asList(families));
+ }
+
+ /**
+ * See have doc for {@link #startNewLocalityGroup(String, List)}.
+ *
+ * @param families
+ * will be encoded using UTF-8
+ *
+ * @throws IllegalStateException
+ * When default locality group already started.
+ */
+ public void startNewLocalityGroup(String name, Set<String> families) throws IOException {
+ HashSet<ByteSequence> fams = new HashSet<ByteSequence>();
+ for (String family : families) {
+ fams.add(new ArrayByteSequence(family));
+ }
+ _startNewLocalityGroup(name, fams);
+ }
+
+ /**
+ * See have doc for {@link #startNewLocalityGroup(String, List)}.
+ *
+ * @param families
+ * will be encoded using UTF-8
+ *
+ * @throws IllegalStateException
+ * When default locality group already started.
+ */
+ public void startNewLocalityGroup(String name, String... families) throws IOException {
+ HashSet<ByteSequence> fams = new HashSet<ByteSequence>();
+ for (String family : families) {
+ fams.add(new ArrayByteSequence(family));
+ }
+ _startNewLocalityGroup(name, fams);
+ }
+
+ /**
+ * A locality group in which the column families do not need to specified. The locality group must be started after all other locality groups. Can not append
+ * column families that were in a previous locality group.
+ *
+ * @throws IllegalStateException
+ * When default locality group already started.
+ */
+
+ public void startDefaultLocalityGroup() throws IOException {
+ Preconditions.checkState(!startedDefaultLG);
+ writer.startDefaultLocalityGroup();
+ startedDefaultLG = true;
+ startedLG = true;
+ }
+
+ /**
+ * Append the key and value to the last locality group that was started.
+ *
+ * @param key
+ * This key must be greater than or equal to the last key appended. For non-default locality groups, the keys column family must be one of the column
+ * families specified when calling startNewLocalityGroup(). Must be non-null.
+ * @param val
+ * value to append, must be non-null.
+ *
+ * @throws IllegalArgumentException
+ * This is thrown when data is appended out of order OR when the key contains a invalid visibility OR when a column family is not valid for a
+ * locality group.
+ * @throws IllegalStateException
+ * Thrown when no locality group was started.
+ */
+ public void append(Key key, Value val) throws IOException {
+ Preconditions.checkState(startedLG, "No locality group was started");
+ Boolean wasChecked = (Boolean) validVisibilities.get(key.getColumnVisibilityData());
+ if (wasChecked == null) {
+ byte[] cv = key.getColumnVisibilityData().toArray();
+ new ColumnVisibility(cv);
+ validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE);
+ }
+ writer.append(key, val);
+ }
+
+ /**
+ * Append the keys and values to the last locality group that was started.
+ *
+ * @param keyValues
+ * The keys must be in sorted order. The first key returned by the iterable must be greater than or equal to the last key appended. For non-default
+ * locality groups, the keys column family must be one of the column families specified when calling startNewLocalityGroup(). Must be non-null.
+ *
+ * @throws IllegalArgumentException
+ * This is thrown when data is appended out of order OR when the key contains a invalid visibility OR when a column family is not valid for a
+ * locality group.
+ * @throws IllegalStateException
+ * When no locality group was started.
+ */
+ public void append(Iterable<Entry<Key,Value>> keyValues) throws IOException {
+ Preconditions.checkState(startedLG, "No locality group was started");
+ for (Entry<Key,Value> entry : keyValues) {
+ append(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
new file mode 100644
index 0000000..e4a141c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.rfile;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+import org.apache.accumulo.core.client.rfile.RFile.WriterFSOptions;
+import org.apache.accumulo.core.client.rfile.RFile.WriterOptions;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions, RFile.WriterOptions {
+
+ private static class OutputArgs extends FSConfArgs {
+ private Path path;
+ private OutputStream out;
+
+ OutputArgs(String filename) {
+ this.path = new Path(filename);
+ }
+
+ OutputArgs(OutputStream out) {
+ this.out = out;
+ }
+
+ OutputStream getOutputStream() {
+ return out;
+ }
+ }
+
+ private OutputArgs out;
+ private SamplerConfiguration sampler = null;
+ private Map<String,String> tableConfig = Collections.emptyMap();
+ private int visCacheSize = 1000;
+
+ @Override
+ public WriterOptions withSampler(SamplerConfiguration samplerConf) {
+ Objects.requireNonNull(samplerConf);
+ SamplerConfigurationImpl.checkDisjoint(tableConfig, samplerConf);
+ this.sampler = samplerConf;
+ return this;
+ }
+
+ @Override
+ public RFileWriter build() throws IOException {
+ FileOperations fileops = FileOperations.getInstance();
+ AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();
+ HashMap<String,String> userProps = new HashMap<>();
+ if (sampler != null) {
+ userProps.putAll(new SamplerConfigurationImpl(sampler).toTablePropertiesMap());
+ }
+ userProps.putAll(tableConfig);
+
+ if (userProps.size() > 0) {
+ acuconf = new ConfigurationCopy(Iterables.concat(acuconf, userProps.entrySet()));
+ }
+
+ if (out.getOutputStream() != null) {
+ FSDataOutputStream fsdo;
+ if (out.getOutputStream() instanceof FSDataOutputStream) {
+ fsdo = (FSDataOutputStream) out.getOutputStream();
+ } else {
+ fsdo = new FSDataOutputStream(out.getOutputStream(), new FileSystem.Statistics("foo"));
+ }
+ return new RFileWriter(fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf()).withTableConfiguration(acuconf).build(), visCacheSize);
+ } else {
+ return new RFileWriter(fileops.newWriterBuilder().forFile(out.path.toString(), out.getFileSystem(), out.getConf()).withTableConfiguration(acuconf)
+ .build(), visCacheSize);
+ }
+ }
+
+ @Override
+ public WriterOptions withFileSystem(FileSystem fs) {
+ Objects.requireNonNull(fs);
+ out.fs = fs;
+ return this;
+ }
+
+ @Override
+ public WriterFSOptions to(String filename) {
+ Objects.requireNonNull(filename);
+ this.out = new OutputArgs(filename);
+ return this;
+ }
+
+ @Override
+ public WriterOptions to(OutputStream out) {
+ Objects.requireNonNull(out);
+ this.out = new OutputArgs(out);
+ return this;
+ }
+
+ @Override
+ public WriterOptions withTableProperties(Iterable<Entry<String,String>> tableConfig) {
+ Objects.requireNonNull(tableConfig);
+ HashMap<String,String> cfg = new HashMap<>();
+ for (Entry<String,String> entry : tableConfig) {
+ cfg.put(entry.getKey(), entry.getValue());
+ }
+
+ SamplerConfigurationImpl.checkDisjoint(cfg, sampler);
+ this.tableConfig = cfg;
+ return this;
+ }
+
+ @Override
+ public WriterOptions withTableProperties(Map<String,String> tableConfig) {
+ Objects.requireNonNull(tableConfig);
+ return withTableProperties(tableConfig.entrySet());
+ }
+
+ @Override
+ public WriterOptions withVisibilityCacheSize(int maxSize) {
+ Preconditions.checkArgument(maxSize > 0);
+ this.visCacheSize = maxSize;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/client/sample/Sampler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/sample/Sampler.java b/core/src/main/java/org/apache/accumulo/core/client/sample/Sampler.java
index 03bd9d7..8b4db95 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/sample/Sampler.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/sample/Sampler.java
@@ -36,7 +36,6 @@ import org.apache.accumulo.core.data.Key;
*
* @since 1.8.0
*/
-
public interface Sampler {
/**
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index 314bbae..4724bbe 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
public abstract class FileOperations {
@@ -76,7 +77,7 @@ public abstract class FileOperations {
* </pre>
*/
public NeedsFile<GetFileSizeOperationBuilder> getFileSize() {
- return (NeedsFile<GetFileSizeOperationBuilder>) new GetFileSizeOperation();
+ return new GetFileSizeOperation();
}
/**
@@ -92,8 +93,8 @@ public abstract class FileOperations {
* .build();
* </pre>
*/
- public NeedsFile<OpenWriterOperationBuilder> newWriterBuilder() {
- return (NeedsFile<OpenWriterOperationBuilder>) new OpenWriterOperation();
+ public NeedsFileOrOuputStream<OpenWriterOperationBuilder> newWriterBuilder() {
+ return new OpenWriterOperation();
}
/**
@@ -110,7 +111,7 @@ public abstract class FileOperations {
* </pre>
*/
public NeedsFile<OpenIndexOperationBuilder> newIndexReaderBuilder() {
- return (NeedsFile<OpenIndexOperationBuilder>) new OpenIndexOperation();
+ return new OpenIndexOperation();
}
/**
@@ -150,7 +151,7 @@ public abstract class FileOperations {
* </pre>
*/
public NeedsFile<OpenReaderOperationBuilder> newReaderBuilder() {
- return (NeedsFile<OpenReaderOperationBuilder>) new OpenReaderOperation();
+ return new OpenReaderOperation();
}
//
@@ -203,6 +204,10 @@ public abstract class FileOperations {
return (SubclassType) this;
}
+ protected void setFilename(String filename) {
+ this.filename = filename;
+ }
+
public String getFilename() {
return filename;
}
@@ -211,6 +216,10 @@ public abstract class FileOperations {
return fs;
}
+ protected void setConfiguration(Configuration fsConf) {
+ this.fsConf = fsConf;
+ }
+
public Configuration getConfiguration() {
return fsConf;
}
@@ -239,6 +248,7 @@ public abstract class FileOperations {
*/
protected class GetFileSizeOperation extends FileAccessOperation<GetFileSizeOperation> implements GetFileSizeOperationBuilder {
/** Return the size of the file. */
+ @Override
public long execute() throws IOException {
validate();
return getFileSize(this);
@@ -278,9 +288,20 @@ public abstract class FileOperations {
/**
* Operation object for constructing a writer.
*/
- protected class OpenWriterOperation extends FileIOOperation<OpenWriterOperation> implements OpenWriterOperationBuilder {
+ protected class OpenWriterOperation extends FileIOOperation<OpenWriterOperation> implements OpenWriterOperationBuilder,
+ NeedsFileOrOuputStream<OpenWriterOperationBuilder> {
private String compression;
+ private FSDataOutputStream outputStream;
+ @Override
+ public NeedsTableConfiguration<OpenWriterOperationBuilder> forOutputStream(String extenstion, FSDataOutputStream outputStream, Configuration fsConf) {
+ this.outputStream = outputStream;
+ setConfiguration(fsConf);
+ setFilename("foo" + extenstion);
+ return this;
+ }
+
+ @Override
public OpenWriterOperation withCompression(String compression) {
this.compression = compression;
return this;
@@ -290,6 +311,21 @@ public abstract class FileOperations {
return compression;
}
+ public FSDataOutputStream getOutputStream() {
+ return outputStream;
+ }
+
+ @Override
+ protected void validate() {
+ if (outputStream == null) {
+ super.validate();
+ } else {
+ Objects.requireNonNull(getConfiguration());
+ Objects.requireNonNull(getTableConfiguration());
+ }
+ }
+
+ @Override
public FileSKVWriter build() throws IOException {
validate();
return openWriter(this);
@@ -359,6 +395,7 @@ public abstract class FileOperations {
* Operation object for opening an index.
*/
protected class OpenIndexOperation extends FileReaderOperation<OpenIndexOperation> implements OpenIndexOperationBuilder {
+ @Override
public FileSKVIterator build() throws IOException {
validate();
return openIndex(this);
@@ -378,6 +415,7 @@ public abstract class FileOperations {
private boolean inclusive;
/** Set the range over which the constructed iterator will search. */
+ @Override
public OpenScanReaderOperation overRange(Range range, Set<ByteSequence> columnFamilies, boolean inclusive) {
this.range = range;
this.columnFamilies = columnFamilies;
@@ -407,6 +445,7 @@ public abstract class FileOperations {
}
/** Execute the operation, constructing a scan iterator. */
+ @Override
public FileSKVIterator build() throws IOException {
validate();
return openScanReader(this);
@@ -427,11 +466,13 @@ public abstract class FileOperations {
/**
* Seek the constructed iterator to the beginning of its domain before returning. Equivalent to {@code seekToBeginning(true)}.
*/
+ @Override
public OpenReaderOperation seekToBeginning() {
return seekToBeginning(true);
}
/** If true, seek the constructed iterator to the beginning of its domain before returning. */
+ @Override
public OpenReaderOperation seekToBeginning(boolean seekToBeginning) {
this.seekToBeginning = seekToBeginning;
return this;
@@ -442,6 +483,7 @@ public abstract class FileOperations {
}
/** Execute the operation, constructing the specified file reader. */
+ @Override
public FileSKVIterator build() throws IOException {
validate();
return openReader(this);
@@ -473,6 +515,11 @@ public abstract class FileOperations {
public NeedsFileSystem<ReturnType> forFile(String filename);
}
+ public static interface NeedsFileOrOuputStream<ReturnType> extends NeedsFile<ReturnType> {
+ /** Specify the file this operation should apply to. */
+ public NeedsTableConfiguration<ReturnType> forOutputStream(String extenstion, FSDataOutputStream out, Configuration fsConf);
+ }
+
/**
* Type wrapper to ensure that {@code inFileSystem(...)} is called before other methods.
*/
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 981a2e6..f6269e7 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -412,7 +412,7 @@ public class RFile {
public void append(Key key, Value value) throws IOException {
if (key.compareTo(prevKey) < 0) {
- throw new IllegalStateException("Keys appended out-of-order. New key " + key + ", previous key " + prevKey);
+ throw new IllegalArgumentException("Keys appended out-of-order. New key " + key + ", previous key " + prevKey);
}
currentLocalityGroup.updateColumnCount(key);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 5d15973..96d31ce 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -34,6 +34,7 @@ import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.sample.impl.SamplerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -77,21 +78,8 @@ public class RFileOperations extends FileOperations {
@Override
protected FileSKVWriter openWriter(OpenWriterOperation options) throws IOException {
- Configuration conf = options.getConfiguration();
- AccumuloConfiguration acuconf = options.getTableConfiguration();
- int hrep = conf.getInt("dfs.replication", -1);
- int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
- int rep = hrep;
- if (trep > 0 && trep != hrep) {
- rep = trep;
- }
- long hblock = conf.getLong("dfs.block.size", 1 << 26);
- long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
- long block = hblock;
- if (tblock > 0)
- block = tblock;
- int bufferSize = conf.getInt("io.file.buffer.size", 4096);
+ AccumuloConfiguration acuconf = options.getTableConfiguration();
long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
@@ -106,11 +94,32 @@ public class RFileOperations extends FileOperations {
String compression = options.getCompression();
compression = compression == null ? options.getTableConfiguration().get(Property.TABLE_FILE_COMPRESSION_TYPE) : compression;
- String file = options.getFilename();
- FileSystem fs = options.getFileSystem();
+ FSDataOutputStream outputStream = options.getOutputStream();
+
+ Configuration conf = options.getConfiguration();
+
+ if (outputStream == null) {
+ int hrep = conf.getInt("dfs.replication", -1);
+ int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
+ int rep = hrep;
+ if (trep > 0 && trep != hrep) {
+ rep = trep;
+ }
+ long hblock = conf.getLong("dfs.block.size", 1 << 26);
+ long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
+ long block = hblock;
+ if (tblock > 0)
+ block = tblock;
+ int bufferSize = conf.getInt("io.file.buffer.size", 4096);
+
+ String file = options.getFilename();
+ FileSystem fs = options.getFileSystem();
+
+ outputStream = fs.create(new Path(file), false, bufferSize, (short) rep, block);
+ }
- CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(new RateLimitedOutputStream(fs.create(new Path(file), false, bufferSize, (short) rep, block),
- options.getRateLimiter()), compression, conf, acuconf);
+ CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(new RateLimitedOutputStream(outputStream, options.getRateLimiter()), compression, conf,
+ acuconf);
RFile.Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler);
return writer;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
index 97e4f5c..8188ba3 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java
@@ -35,12 +35,19 @@ import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.constraints.DefaultKeySizeConstraint;
+import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.data.thrift.IterInfo;
+import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
+import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter;
+import org.apache.accumulo.core.iterators.system.DeletingIterator;
import org.apache.accumulo.core.iterators.system.SynchronizedIterator;
+import org.apache.accumulo.core.iterators.system.VisibilityFilter;
import org.apache.accumulo.core.iterators.user.VersioningIterator;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
import org.apache.accumulo.core.tabletserver.thrift.TIteratorSetting;
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
@@ -384,4 +391,12 @@ public class IteratorUtil {
}
return toIteratorSettings(ic);
}
+
+ public static SortedKeyValueIterator<Key,Value> setupSystemScanIterators(SortedKeyValueIterator<Key,Value> source, Set<Column> cols, Authorizations auths,
+ byte[] defaultVisibility) throws IOException {
+ DeletingIterator delIter = new DeletingIterator(source, false);
+ ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);
+ ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, cols);
+ return new VisibilityFilter(colFilter, auths, defaultVisibility);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
index f0bd528..8abf4e7 100644
--- a/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/sample/impl/SamplerConfigurationImpl.java
@@ -17,6 +17,8 @@
package org.apache.accumulo.core.sample.impl;
+import static com.google.common.base.Preconditions.checkArgument;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -169,6 +171,16 @@ public class SamplerConfigurationImpl implements Writable {
return className + " " + options;
}
+ public static void checkDisjoint(Map<String,String> props, SamplerConfiguration samplerConfiguration) {
+ if (props.isEmpty() || samplerConfiguration == null) {
+ return;
+ }
+
+ Map<String,String> sampleProps = new SamplerConfigurationImpl(samplerConfiguration).toTablePropertiesMap();
+
+ checkArgument(Collections.disjoint(props.keySet(), sampleProps.keySet()), "Properties and derived sampler properties are not disjoint");
+ }
+
public static TSamplerConfiguration toThrift(SamplerConfiguration samplerConfig) {
if (samplerConfig == null)
return null;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/61a7de4a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
index 07757a6..a5255c7 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/LocalityGroupUtil.java
@@ -49,6 +49,8 @@ public class LocalityGroupUtil {
public static final Set<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
public static Set<ByteSequence> families(Collection<Column> columns) {
+ if (columns.size() == 0)
+ return EMPTY_CF_SET;
Set<ByteSequence> result = new HashSet<ByteSequence>(columns.size());
for (Column col : columns) {
result.add(new ArrayByteSequence(col.getColumnFamily()));