You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/04/30 00:32:03 UTC
[2/2] accumulo git commit: ACCUMULO-4195 Added generalized
configuration objects for RFile interaction.
ACCUMULO-4195 Added generalized configuration objects for RFile interaction.
Implemented a builder/fluent style of syntax, where each operation creates an operation object with both some methods for setting
parameters and an execute() method to actually invoke the operation.
Closes apache/accumulo#95
Signed-off-by: Josh Elser <el...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4a79d586
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4a79d586
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4a79d586
Branch: refs/heads/master
Commit: 4a79d586d190a1eda7141217054da856ee49f202
Parents: a605d22
Author: Shawn Walker <ac...@shawn-walker.net>
Authored: Tue Apr 19 15:54:31 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Apr 29 11:51:59 2016 -0400
----------------------------------------------------------------------
.../core/client/impl/OfflineIterator.java | 2 +-
.../client/mapred/AccumuloFileOutputFormat.java | 3 +-
.../mapreduce/AccumuloFileOutputFormat.java | 3 +-
.../core/client/mock/MockTableOperations.java | 7 +-
.../accumulo/core/file/BloomFilterLayer.java | 4 +-
.../core/file/DispatchingFileFactory.java | 95 ++--
.../accumulo/core/file/FileOperations.java | 455 ++++++++++++++++++-
.../core/file/map/MapFileOperations.java | 56 +--
.../accumulo/core/file/rfile/CreateEmpty.java | 4 +-
.../core/file/rfile/RFileOperations.java | 82 ++--
.../client/mock/MockTableOperationsTest.java | 3 +-
.../core/file/BloomFilterLayerLookupTest.java | 4 +-
.../accumulo/core/file/FileOperationsTest.java | 2 +-
.../accumulo/server/client/BulkImporter.java | 3 +-
.../apache/accumulo/server/init/Initialize.java | 3 +-
.../apache/accumulo/server/util/FileUtil.java | 24 +-
.../server/client/BulkImporterTest.java | 3 +-
.../apache/accumulo/tserver/FileManager.java | 4 +-
.../apache/accumulo/tserver/InMemoryMap.java | 5 +-
.../compaction/MajorCompactionRequest.java | 3 +-
.../accumulo/tserver/tablet/Compactor.java | 6 +-
.../apache/accumulo/tserver/tablet/Tablet.java | 3 +-
.../accumulo/tserver/tablet/TabletData.java | 3 +-
.../accumulo/test/BulkImportMonitoringIT.java | 5 +-
.../apache/accumulo/test/CreateRandomRFile.java | 2 +-
.../accumulo/test/GenerateSequentialRFile.java | 3 +-
.../apache/accumulo/test/GetFileInfoBulkIT.java | 5 +-
.../org/apache/accumulo/test/ShellServerIT.java | 4 +-
.../org/apache/accumulo/test/TestIngest.java | 4 +-
.../accumulo/test/functional/BulkFileIT.java | 9 +-
.../test/mapred/AccumuloFileOutputFormatIT.java | 4 +-
.../mapreduce/AccumuloFileOutputFormatIT.java | 4 +-
.../performance/metadata/FastBulkImportIT.java | 4 +-
.../performance/scan/CollectTabletStats.java | 6 +-
.../accumulo/test/proxy/SimpleProxyBase.java | 3 +-
.../test/randomwalk/bulk/BulkPlusOne.java | 3 +-
.../test/randomwalk/security/TableOp.java | 4 +-
37 files changed, 595 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
index 20c53e1..87abd0b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
@@ -345,7 +345,7 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
// TODO need to close files - ACCUMULO-1303
for (String file : absFiles) {
FileSystem fs = VolumeConfiguration.getVolume(file, conf, config).getFileSystem();
- FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, null, acuTableConf, null, null);
+ FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder().forFile(file, fs, conf).withTableConfiguration(acuTableConf).build();
if (scannerSamplerConfigImpl != null) {
reader = reader.getSample(scannerSamplerConfigImpl);
if (reader == null)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
index 8fa5f62..45796cb 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
@@ -186,7 +186,8 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
}
if (out == null) {
- out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, null, acuConf);
+ out = FileOperations.getInstance().newWriterBuilder().forFile(file.toString(), file.getFileSystem(conf), conf).withTableConfiguration(acuConf)
+ .build();
out.startDefaultLocalityGroup();
}
out.append(key, value);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
index 2d62279..bf0474e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
@@ -184,7 +184,8 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
}
if (out == null) {
- out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, null, acuConf);
+ out = FileOperations.getInstance().newWriterBuilder().forFile(file.toString(), file.getFileSystem(conf), conf).withTableConfiguration(acuConf)
+ .build();
out.startDefaultLocalityGroup();
}
out.append(key, value);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
index aa64a10..fd1128c 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
@@ -16,8 +16,6 @@
*/
package org.apache.accumulo.core.client.mock;
-import static com.google.common.base.Preconditions.checkArgument;
-
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
@@ -67,6 +65,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkArgument;
/**
* @deprecated since 1.8.0; use MiniAccumuloCluster or a standard mock framework instead.
@@ -289,8 +288,8 @@ class MockTableOperations extends TableOperationsHelper {
*/
for (FileStatus importStatus : fs.listStatus(importPath)) {
try {
- FileSKVIterator importIterator = FileOperations.getInstance().openReader(importStatus.getPath().toString(), true, fs, fs.getConf(), null,
- AccumuloConfiguration.getDefaultConfiguration());
+ FileSKVIterator importIterator = FileOperations.getInstance().newReaderBuilder().forFile(importStatus.getPath().toString(), fs, fs.getConf())
+ .withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).seekToBeginning().build();
while (importIterator.hasTop()) {
Key key = importIterator.getTopKey();
Value value = importIterator.getTopValue();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
index c9918bd..c9ac8b6 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
@@ -458,7 +458,7 @@ public class BloomFilterLayer {
String suffix = FileOperations.getNewFileExtension(acuconf);
String fname = "/tmp/test." + suffix;
- FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, null, acuconf);
+ FileSKVWriter bmfw = FileOperations.getInstance().newWriterBuilder().forFile(fname, fs, conf).withTableConfiguration(acuconf).build();
long t1 = System.currentTimeMillis();
@@ -477,7 +477,7 @@ public class BloomFilterLayer {
bmfw.close();
t1 = System.currentTimeMillis();
- FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, null, acuconf);
+ FileSKVIterator bmfr = FileOperations.getInstance().newReaderBuilder().forFile(fname, fs, conf).withTableConfiguration(acuconf).build();
t2 = System.currentTimeMillis();
out.println("Opened " + fname + " in " + (t2 - t1));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
index 9478a29..c7d8248 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
@@ -17,25 +17,18 @@
package org.apache.accumulo.core.file;
import java.io.IOException;
-import java.util.Set;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.ByteSequence;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
import org.apache.accumulo.core.file.map.MapFileOperations;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.file.rfile.RFileOperations;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
class DispatchingFileFactory extends FileOperations {
- private FileOperations findFileFactory(String file) {
+ private FileOperations findFileFactory(FileAccessOperation<?> options) {
+ String file = options.getFilename();
Path p = new Path(file);
String name = p.getName();
@@ -60,78 +53,52 @@ class DispatchingFileFactory extends FileOperations {
}
}
- @Override
- public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
- return findFileFactory(file).openIndex(file, fs, conf, acuconf, null, null);
- }
-
- @Override
- public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
- AccumuloConfiguration acuconf) throws IOException {
- FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf, null, null);
- if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
- return new BloomFilterLayer.Reader(iter, acuconf);
+ /** If the table configuration disallows caching, rewrite the options object to not pass the caches. */
+ private static <T extends FileReaderOperation<T>> T selectivelyDisableCaches(T input) {
+ if (!input.getTableConfiguration().getBoolean(Property.TABLE_INDEXCACHE_ENABLED)) {
+ input = input.withIndexCache(null);
}
- return iter;
- }
-
- @Override
- public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf) throws IOException {
- FileSKVWriter writer = findFileFactory(file).openWriter(file, fs, conf, writeLimiter, acuconf);
- if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
- return new BloomFilterLayer.Writer(writer, acuconf);
+ if (!input.getTableConfiguration().getBoolean(Property.TABLE_BLOCKCACHE_ENABLED)) {
+ input = input.withDataCache(null);
}
- return writer;
+ return input;
}
@Override
- public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
- return findFileFactory(file).getFileSize(file, fs, conf, acuconf);
+ protected long getFileSize(GetFileSizeOperation options) throws IOException {
+ return findFileFactory(options).getFileSize(options);
}
@Override
- public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException {
- return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, readLimiter, tableConf, null, null);
+ protected FileSKVWriter openWriter(OpenWriterOperation options) throws IOException {
+ FileSKVWriter writer = findFileFactory(options).openWriter(options);
+ if (options.getTableConfiguration().getBoolean(Property.TABLE_BLOOM_ENABLED)) {
+ return new BloomFilterLayer.Writer(writer, options.getTableConfiguration());
+ } else {
+ return writer;
+ }
}
@Override
- public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
-
- if (!tableConf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
- indexCache = null;
- if (!tableConf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
- dataCache = null;
-
- return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, readLimiter, tableConf, dataCache, indexCache);
+ protected FileSKVIterator openIndex(OpenIndexOperation options) throws IOException {
+ options = selectivelyDisableCaches(options);
+ return findFileFactory(options).openIndex(options);
}
@Override
- public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
- AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException {
-
- if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
- indexCache = null;
- if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
- dataCache = null;
-
- FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf, dataCache, indexCache);
- if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
- return new BloomFilterLayer.Reader(iter, acuconf);
+ protected FileSKVIterator openReader(OpenReaderOperation options) throws IOException {
+ options = selectivelyDisableCaches(options);
+ FileSKVIterator iter = findFileFactory(options).openReader(options);
+ if (options.getTableConfiguration().getBoolean(Property.TABLE_BLOOM_ENABLED)) {
+ return new BloomFilterLayer.Reader(iter, options.getTableConfiguration());
+ } else {
+ return iter;
}
- return iter;
}
@Override
- public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dCache, BlockCache iCache)
- throws IOException {
-
- if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
- iCache = null;
- if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
- dCache = null;
-
- return findFileFactory(file).openIndex(file, fs, conf, acuconf, dCache, iCache);
+ protected FileSKVIterator openScanReader(OpenScanReaderOperation options) throws IOException {
+ options = selectivelyDisableCaches(options);
+ return findFileFactory(options).openScanReader(options);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index dc7c646..797324b 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.file;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Objects;
import java.util.Set;
import org.apache.accumulo.core.Constants;
@@ -48,38 +49,456 @@ public abstract class FileOperations {
return new DispatchingFileFactory();
}
+ //
+ // Abstract methods (to be implemented by subclasses)
+ //
+
+ protected abstract long getFileSize(GetFileSizeOperation options) throws IOException;
+
+ protected abstract FileSKVWriter openWriter(OpenWriterOperation options) throws IOException;
+
+ protected abstract FileSKVIterator openIndex(OpenIndexOperation options) throws IOException;
+
+ protected abstract FileSKVIterator openScanReader(OpenScanReaderOperation options) throws IOException;
+
+ protected abstract FileSKVIterator openReader(OpenReaderOperation options) throws IOException;
+
+ //
+ // File operations
+ //
+
/**
- * Open a reader that will not be seeked giving an initial seek location. This is useful for file operations that only need to scan data within a range and do
- * not need to seek. Therefore file metadata such as indexes does not need to be kept in memory while the file is scanned. Also seek optimizations like bloom
- * filters do not need to be loaded.
+ * Construct an operation object allowing one to query the size of a file. <br>
+ * Syntax:
*
+ * <pre>
+ * long size = fileOperations.getFileSize().forFile(filename, fileSystem, fsConfiguration).withTableConfiguration(tableConf).execute();
+ * </pre>
*/
+ @SuppressWarnings("unchecked")
+ public NeedsFile<GetFileSizeOperationBuilder> getFileSize() {
+ return (NeedsFile) new GetFileSizeOperation();
+ }
- public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;
+ /**
+ * Construct an operation object allowing one to create a writer for a file. <br>
+ * Syntax:
+ *
+ * <pre>
+ * FileSKVWriter writer = fileOperations.newWriterBuilder()
+ * .forFile(...)
+ * .withTableConfiguration(...)
+ * .withRateLimiter(...) // optional
+ * .withCompression(...) // optional
+ * .build();
+ * </pre>
+ */
+ @SuppressWarnings("unchecked")
+ public NeedsFile<OpenWriterOperationBuilder> newWriterBuilder() {
+ return (NeedsFile) new OpenWriterOperation();
+ }
- public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
+ /**
+ * Construct an operation object allowing one to create an index iterator for a file. <br>
+ * Syntax:
+ *
+ * <pre>
+ * FileSKVIterator iterator = fileOperations.newIndexReaderBuilder()
+ * .forFile(...)
+ * .withTableConfiguration(...)
+ * .withRateLimiter(...) // optional
+ * .withBlockCache(...) // optional
+ * .build();
+ * </pre>
+ */
+ @SuppressWarnings("unchecked")
+ public NeedsFile<OpenIndexOperationBuilder> newIndexReaderBuilder() {
+ return (NeedsFile) new OpenIndexOperation();
+ }
/**
- * Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
+ * Construct an operation object allowing one to create a "scan" reader for a file. Scan readers do not have any optimizations for seeking beyond their
+ * initial position. This is useful for file operations that only need to scan data within a range and do not need to seek. Therefore file metadata such as
+ * indexes does not need to be kept in memory while the file is scanned. Also seek optimizations like bloom filters do not need to be loaded. <br>
+ * Syntax:
*
+ * <pre>
+ * FileSKVIterator scanner = fileOperations.newScanReaderBuilder()
+ * .forFile(...)
+ * .withTableConfiguration(...)
+ * .overRange(...)
+ * .withRateLimiter(...) // optional
+ * .withBlockCache(...) // optional
+ * .build();
+ * </pre>
+ */
+ @SuppressWarnings("unchecked")
+ public NeedsFile<NeedsRange<OpenScanReaderOperationBuilder>> newScanReaderBuilder() {
+ return (NeedsFile) new OpenScanReaderOperation();
+ }
+
+ /**
+ * Construct an operation object allowing one to create a reader for a file. A reader constructed in this manner fully supports seeking, and also enables any
+ * optimizations related to seeking (e.g. Bloom filters). <br>
+ * Syntax:
+ *
+ * <pre>
+ * FileSKVIterator scanner = fileOperations.newReaderBuilder()
+ * .forFile(...)
+ * .withTableConfiguration(...)
+ * .withRateLimiter(...) // optional
+ * .withBlockCache(...) // optional
+ * .seekToBeginning(...) // optional
+ * .build();
+ * </pre>
+ */
+ @SuppressWarnings("unchecked")
+ public NeedsFile<OpenReaderOperationBuilder> newReaderBuilder() {
+ return (NeedsFile) new OpenReaderOperation();
+ }
+
+ //
+ // Domain specific embedded language for execution of operations.
+ //
+ // Here, for each ...Operation class which is a POJO holding a group of parameters,
+ // we have a parallel ...OperationBuilder interface which only exposes the setters / execute methods.
+ // This allows us to expose only the setter/execute methods to upper layers, while
+ // allowing lower layers the freedom to both get and set.
+ //
+
+ /**
+ * Options common to all FileOperations.
*/
+ protected static class FileAccessOperation<SubclassType extends FileAccessOperation<SubclassType>> {
+ private AccumuloConfiguration tableConfiguration;
+
+ private String filename;
+ private FileSystem fs;
+ private Configuration fsConf;
+
+ /** Specify the table configuration defining access to this file. */
+ @SuppressWarnings("unchecked")
+ public SubclassType withTableConfiguration(AccumuloConfiguration tableConfiguration) {
+ this.tableConfiguration = tableConfiguration;
+ return (SubclassType) this;
+ }
+
+ /** Specify the file this operation should apply to. */
+ @SuppressWarnings("unchecked")
+ public SubclassType forFile(String filename, FileSystem fs, Configuration fsConf) {
+ this.filename = filename;
+ this.fs = fs;
+ this.fsConf = fsConf;
+ return (SubclassType) this;
+ }
- public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
- AccumuloConfiguration acuconf) throws IOException;
+ /** Specify the file this operation should apply to. */
+ @SuppressWarnings("unchecked")
+ public SubclassType forFile(String filename) {
+ this.filename = filename;
+ return (SubclassType) this;
+ }
- public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
- AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException;
+ /** Specify the filesystem which this operation should apply to, along with its configuration. */
+ @SuppressWarnings("unchecked")
+ public SubclassType inFileSystem(FileSystem fs, Configuration fsConf) {
+ this.fs = fs;
+ this.fsConf = fsConf;
+ return (SubclassType) this;
+ }
- public abstract FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf)
- throws IOException;
+ public String getFilename() {
+ return filename;
+ }
- public abstract FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException;
+ public FileSystem getFileSystem() {
+ return fs;
+ }
- public abstract FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dCache, BlockCache iCache)
- throws IOException;
+ public Configuration getConfiguration() {
+ return fsConf;
+ }
- public abstract long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException;
+ public AccumuloConfiguration getTableConfiguration() {
+ return tableConfiguration;
+ }
+
+ /** Check for null parameters. */
+ protected void validate() {
+ Objects.requireNonNull(getFilename());
+ Objects.requireNonNull(getFileSystem());
+ Objects.requireNonNull(getConfiguration());
+ Objects.requireNonNull(getTableConfiguration());
+ }
+ }
+
+ /** Builder interface parallel to {@link FileAccessOperation}. */
+ protected static interface FileAccessOperationBuilder<SubbuilderType> extends NeedsFile<SubbuilderType>, NeedsFileSystem<SubbuilderType>,
+ NeedsTableConfiguration<SubbuilderType> {
+ // no optional/generic methods.
+ }
+
+ /**
+ * Operation object for performing {@code getFileSize()} operations.
+ */
+ protected class GetFileSizeOperation extends FileAccessOperation<GetFileSizeOperation> implements GetFileSizeOperationBuilder {
+ /** Return the size of the file. */
+ public long execute() throws IOException {
+ validate();
+ return getFileSize(this);
+ }
+ }
+
+ /** Builder interface for {@link GetFileSizeOperation}, allowing execution of {@code getFileSize()} operations. */
+ public static interface GetFileSizeOperationBuilder extends FileAccessOperationBuilder<GetFileSizeOperationBuilder> {
+ /** Return the size of the file. */
+ public long execute() throws IOException;
+ }
+
+ /**
+ * Options common to all {@code FileOperation}s which perform reading or writing.
+ */
+ protected static class FileIOOperation<SubclassType extends FileIOOperation<SubclassType>> extends FileAccessOperation<SubclassType> {
+ private RateLimiter rateLimiter;
+
+ /** Specify a rate limiter for this operation. */
+ @SuppressWarnings("unchecked")
+ public SubclassType withRateLimiter(RateLimiter rateLimiter) {
+ this.rateLimiter = rateLimiter;
+ return (SubclassType) this;
+ }
+
+ public RateLimiter getRateLimiter() {
+ return rateLimiter;
+ }
+ }
+
+ /** Builder interface parallel to {@link FileIOOperation}. */
+ protected static interface FileIOOperationBuilder<SubbuilderType> extends FileAccessOperationBuilder<SubbuilderType> {
+ /** Specify a rate limiter for this operation. */
+ public SubbuilderType withRateLimiter(RateLimiter rateLimiter);
+ }
+
+ /**
+ * Operation object for constructing a writer.
+ */
+ protected class OpenWriterOperation extends FileIOOperation<OpenWriterOperation> implements OpenWriterOperationBuilder {
+ private String compression;
+
+ public OpenWriterOperation withCompression(String compression) {
+ this.compression = compression;
+ return this;
+ }
+
+ public String getCompression() {
+ return compression;
+ }
+
+ public FileSKVWriter build() throws IOException {
+ validate();
+ return openWriter(this);
+ }
+ }
+
+ /** Builder interface parallel to {@link OpenWriterOperation}. */
+ public static interface OpenWriterOperationBuilder extends FileIOOperationBuilder<OpenWriterOperationBuilder> {
+ /** Set the compression type. */
+ public OpenWriterOperationBuilder withCompression(String compression);
+
+ /** Construct the writer. */
+ public FileSKVWriter build() throws IOException;
+ }
+
+ /**
+ * Options common to all {@code FileOperations} which perform reads.
+ */
+ protected static class FileReaderOperation<SubclassType extends FileReaderOperation<SubclassType>> extends FileIOOperation<SubclassType> {
+ private BlockCache dataCache;
+ private BlockCache indexCache;
+
+ /** (Optional) Set the block cache pair to be used to optimize reads within the constructed reader. */
+ @SuppressWarnings("unchecked")
+ public SubclassType withBlockCache(BlockCache dataCache, BlockCache indexCache) {
+ this.dataCache = dataCache;
+ this.indexCache = indexCache;
+ return (SubclassType) this;
+ }
+
+ /** (Optional) set the data cache to be used to optimize reads within the constructed reader. */
+ @SuppressWarnings("unchecked")
+ public SubclassType withDataCache(BlockCache dataCache) {
+ this.dataCache = dataCache;
+ return (SubclassType) this;
+ }
+
+ /** (Optional) set the index cache to be used to optimize reads within the constructed reader. */
+ @SuppressWarnings("unchecked")
+ public SubclassType withIndexCache(BlockCache indexCache) {
+ this.indexCache = indexCache;
+ return (SubclassType) this;
+ }
+
+ public BlockCache getDataCache() {
+ return dataCache;
+ }
+
+ public BlockCache getIndexCache() {
+ return indexCache;
+ }
+ }
+
+ /** Builder interface parallel to {@link FileReaderOperation}. */
+ protected static interface FileReaderOperationBuilder<SubbuilderType> extends FileIOOperationBuilder<SubbuilderType> {
+ /** (Optional) Set the block cache pair to be used to optimize reads within the constructed reader. */
+ public SubbuilderType withBlockCache(BlockCache dataCache, BlockCache indexCache);
+
+ /** (Optional) set the data cache to be used to optimize reads within the constructed reader. */
+ public SubbuilderType withDataCache(BlockCache dataCache);
+
+ /** (Optional) set the index cache to be used to optimize reads within the constructed reader. */
+ public SubbuilderType withIndexCache(BlockCache indexCache);
+ }
+
+ /**
+ * Operation object for opening an index.
+ */
+ protected class OpenIndexOperation extends FileReaderOperation<OpenIndexOperation> implements OpenIndexOperationBuilder {
+ public FileSKVIterator build() throws IOException {
+ validate();
+ return openIndex(this);
+ }
+ }
+
+ /** Builder interface parallel to {@link OpenIndexOperation}. */
+ public static interface OpenIndexOperationBuilder extends FileReaderOperationBuilder<OpenIndexOperationBuilder> {
+ /** Construct the reader. */
+ public FileSKVIterator build() throws IOException;
+ }
+
+ /** Operation object for opening a scan reader. */
+ protected class OpenScanReaderOperation extends FileReaderOperation<OpenScanReaderOperation> implements OpenScanReaderOperationBuilder {
+ private Range range;
+ private Set<ByteSequence> columnFamilies;
+ private boolean inclusive;
+
+ /** Set the range over which the constructed iterator will search. */
+ public OpenScanReaderOperation overRange(Range range, Set<ByteSequence> columnFamilies, boolean inclusive) {
+ this.range = range;
+ this.columnFamilies = columnFamilies;
+ this.inclusive = inclusive;
+ return this;
+ }
+
+ /** The range over which this reader should scan. */
+ public Range getRange() {
+ return range;
+ }
+
+ /** The column families which this reader should scan. */
+ public Set<ByteSequence> getColumnFamilies() {
+ return columnFamilies;
+ }
+
+ public boolean isRangeInclusive() {
+ return inclusive;
+ }
+
+ @Override
+ protected void validate() {
+ super.validate();
+ Objects.requireNonNull(range);
+ Objects.requireNonNull(columnFamilies);
+ }
+
+ /** Execute the operation, constructing a scan iterator. */
+ public FileSKVIterator build() throws IOException {
+ validate();
+ return openScanReader(this);
+ }
+ }
+
+ /** Builder interface parallel to {@link OpenScanReaderOperation}. */
+ public static interface OpenScanReaderOperationBuilder extends FileReaderOperationBuilder<OpenScanReaderOperationBuilder>,
+ NeedsRange<OpenScanReaderOperationBuilder> {
+ /** Execute the operation, constructing a scan iterator. */
+ public FileSKVIterator build() throws IOException;
+ }
+
+ /** Operation object for opening a full reader. */
+ protected class OpenReaderOperation extends FileReaderOperation<OpenReaderOperation> implements OpenReaderOperationBuilder {
+ private boolean seekToBeginning = false;
+
+ /**
+ * Seek the constructed iterator to the beginning of its domain before returning. Equivalent to {@code seekToBeginning(true)}.
+ */
+ public OpenReaderOperation seekToBeginning() {
+ return seekToBeginning(true);
+ }
+
+ /** If true, seek the constructed iterator to the beginning of its domain before returning. */
+ public OpenReaderOperation seekToBeginning(boolean seekToBeginning) {
+ this.seekToBeginning = seekToBeginning;
+ return this;
+ }
+
+ public boolean isSeekToBeginning() {
+ return seekToBeginning;
+ }
+
+ /** Execute the operation, constructing the specified file reader. */
+ public FileSKVIterator build() throws IOException {
+ validate();
+ return openReader(this);
+ }
+ }
+
+ /** Builder parallel to {@link OpenReaderOperation}. */
+ public static interface OpenReaderOperationBuilder extends FileReaderOperationBuilder<OpenReaderOperationBuilder> {
+ /**
+ * Seek the constructed iterator to the beginning of its domain before returning. Equivalent to {@code seekToBeginning(true)}.
+ */
+ public OpenReaderOperationBuilder seekToBeginning();
+
+ /** If true, seek the constructed iterator to the beginning of its domain before returning. */
+ public OpenReaderOperationBuilder seekToBeginning(boolean seekToBeginning);
+
+ /** Execute the operation, constructing the specified file reader. */
+ public FileSKVIterator build() throws IOException;
+ }
+
+ /**
+ * Type wrapper to ensure that {@code forFile(...)} is called before other methods.
+ */
+ public static interface NeedsFile<ReturnType> {
+ /** Specify the file this operation should apply to. */
+ public NeedsTableConfiguration<ReturnType> forFile(String filename, FileSystem fs, Configuration fsConf);
+
+ /** Specify the file this operation should apply to. */
+ public NeedsFileSystem<ReturnType> forFile(String filename);
+ }
+
+ /**
+ * Type wrapper to ensure that {@code inFileSystem(...)} is called before other methods.
+ */
+ public static interface NeedsFileSystem<ReturnType> {
+ /** Specify the {@link FileSystem} that this operation operates on, along with an alternate configuration. */
+ public NeedsTableConfiguration<ReturnType> inFileSystem(FileSystem fs, Configuration fsConf);
+ }
+
+ /**
+ * Type wrapper to ensure that {@code withTableConfiguration(...)} is called before other methods.
+ */
+ public static interface NeedsTableConfiguration<ReturnType> {
+ /** Specify the table configuration defining access to this file. */
+ public ReturnType withTableConfiguration(AccumuloConfiguration tableConfiguration);
+ }
+
+ /**
+ * Type wrapper to ensure that {@code overRange(...)} is called before other methods.
+ */
+ public static interface NeedsRange<ReturnType> {
+ /** Set the range over which the constructed iterator will search. */
+ public ReturnType overRange(Range range, Set<ByteSequence> columnFamilies, boolean inclusive);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
index a72a243..10ca253 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
@@ -21,10 +21,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
@@ -32,15 +30,11 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.MapFileIterator;
import org.apache.accumulo.core.iterators.system.SequenceFileIterator;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
@@ -142,61 +136,37 @@ public class MapFileOperations extends FileOperations {
}
@Override
- public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
- AccumuloConfiguration acuconf) throws IOException {
- FileSKVIterator iter = new RangeIterator(new MapFileIterator(acuconf, fs, file, conf));
-
- if (seekToBeginning)
+ protected FileSKVIterator openReader(OpenReaderOperation options) throws IOException {
+ FileSKVIterator iter = new RangeIterator(new MapFileIterator(options.getTableConfiguration(), options.getFileSystem(), options.getFilename(),
+ options.getConfiguration()));
+ if (options.isSeekToBeginning()) {
iter.seek(new Range(new Key(), null), new ArrayList<ByteSequence>(), false);
-
+ }
return iter;
}
@Override
- public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf) throws IOException {
+ protected FileSKVWriter openWriter(OpenWriterOperation options) throws IOException {
throw new UnsupportedOperationException();
}
@Override
- public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
- return new SequenceFileIterator(MapFileUtil.openIndex(conf, fs, new Path(file)), false);
+ protected FileSKVIterator openIndex(OpenIndexOperation options) throws IOException {
+ return new SequenceFileIterator(MapFileUtil.openIndex(options.getConfiguration(), options.getFileSystem(), new Path(options.getFilename())), false);
}
@Override
- public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
- return fs.getFileStatus(new Path(file + "/" + MapFile.DATA_FILE_NAME)).getLen();
+ protected long getFileSize(GetFileSizeOperation options) throws IOException {
+ return options.getFileSystem().getFileStatus(new Path(options.getFilename() + "/" + MapFile.DATA_FILE_NAME)).getLen();
}
@Override
- public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException {
- MapFileIterator mfIter = new MapFileIterator(tableConf, fs, file, conf);
+ protected FileSKVIterator openScanReader(OpenScanReaderOperation options) throws IOException {
+ MapFileIterator mfIter = new MapFileIterator(options.getTableConfiguration(), options.getFileSystem(), options.getFilename(), options.getConfiguration());
FileSKVIterator iter = new RangeIterator(mfIter);
-
- iter.seek(range, columnFamilies, inclusive);
+ iter.seek(options.getRange(), options.getColumnFamilies(), options.isRangeInclusive());
return iter;
}
-
- @Override
- public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
-
- return openReader(file, range, columnFamilies, inclusive, fs, conf, readLimiter, tableConf);
- }
-
- @Override
- public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
- AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException {
-
- return openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf);
- }
-
- @Override
- public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dCache, BlockCache iCache)
- throws IOException {
-
- return openIndex(file, fs, conf, acuconf);
- }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
index 045bdbb..46708a6 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
@@ -77,8 +77,8 @@ public class CreateEmpty {
for (String arg : opts.files) {
Path path = new Path(arg);
log.info("Writing to file '" + path + "'");
- FileSKVWriter writer = (new RFileOperations()).openWriter(arg, path.getFileSystem(conf), conf, null, DefaultConfiguration.getDefaultConfiguration(),
- opts.codec);
+ FileSKVWriter writer = (new RFileOperations()).newWriterBuilder().forFile(arg, path.getFileSystem(conf), conf)
+ .withTableConfiguration(DefaultConfiguration.getDefaultConfiguration()).withCompression(opts.codec).build();
writer.close();
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 730a9d3..c8b61b6 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.core.file.rfile;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
-import java.util.Set;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
@@ -29,14 +28,10 @@ import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
-import org.apache.accumulo.core.file.rfile.RFile.Reader;
-import org.apache.accumulo.core.file.rfile.RFile.Writer;
import org.apache.accumulo.core.sample.Sampler;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.sample.impl.SamplerFactory;
-import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -46,73 +41,45 @@ public class RFileOperations extends FileOperations {
private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
- @Override
- public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
- return fs.getFileStatus(new Path(file)).getLen();
- }
-
- @Override
- public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
- return openIndex(file, fs, conf, acuconf, null, null);
- }
-
- private static RFile.Reader getReader(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dataCache,
- BlockCache indexCache) throws IOException {
- Path path = new Path(file);
- CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache, acuconf);
+ private static RFile.Reader getReader(FileReaderOperation<?> options) throws IOException {
+ CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(options.getFileSystem(), new Path(options.getFilename()), options.getConfiguration(),
+ options.getDataCache(), options.getIndexCache(), options.getRateLimiter(), options.getTableConfiguration());
return new RFile.Reader(_cbr);
}
@Override
- public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache)
- throws IOException {
- return getReader(file, fs, conf, acuconf, indexCache, indexCache).getIndex();
+ protected long getFileSize(GetFileSizeOperation options) throws IOException {
+ return options.getFileSystem().getFileStatus(new Path(options.getFilename())).getLen();
}
@Override
- public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
- AccumuloConfiguration acuconf) throws IOException {
- return openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf, null, null);
+ protected FileSKVIterator openIndex(OpenIndexOperation options) throws IOException {
+ return getReader(options).getIndex();
}
@Override
- public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
- AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException {
- Path path = new Path(file);
-
- CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache, readLimiter, acuconf);
- Reader iter = new RFile.Reader(_cbr);
+ protected FileSKVIterator openReader(OpenReaderOperation options) throws IOException {
+ RFile.Reader reader = getReader(options);
- if (seekToBeginning) {
- iter.seek(new Range((Key) null, null), EMPTY_CF_SET, false);
+ if (options.isSeekToBeginning()) {
+ reader.seek(new Range((Key) null, null), EMPTY_CF_SET, false);
}
- return iter;
+ return reader;
}
@Override
- public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException {
- FileSKVIterator iter = openReader(file, false, fs, conf, readLimiter, tableConf, null, null);
- iter.seek(range, columnFamilies, inclusive);
- return iter;
+ protected FileSKVIterator openScanReader(OpenScanReaderOperation options) throws IOException {
+ RFile.Reader reader = getReader(options);
+ reader.seek(options.getRange(), options.getColumnFamilies(), options.isRangeInclusive());
+ return reader;
}
@Override
- public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
- RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
- FileSKVIterator iter = openReader(file, false, fs, conf, readLimiter, tableConf, dataCache, indexCache);
- iter.seek(range, columnFamilies, inclusive);
- return iter;
- }
-
- @Override
- public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf) throws IOException {
- return openWriter(file, fs, conf, writeLimiter, acuconf, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
- }
+ protected FileSKVWriter openWriter(OpenWriterOperation options) throws IOException {
+ Configuration conf = options.getConfiguration();
+ AccumuloConfiguration acuconf = options.getTableConfiguration();
- FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf, String compression)
- throws IOException {
int hrep = conf.getInt("dfs.replication", -1);
int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
int rep = hrep;
@@ -136,9 +103,16 @@ public class RFileOperations extends FileOperations {
sampler = SamplerFactory.newSampler(samplerConfig, acuconf);
}
+ String compression = options.getCompression();
+ compression = compression == null ? options.getTableConfiguration().get(Property.TABLE_FILE_COMPRESSION_TYPE) : compression;
+
+ String file = options.getFilename();
+ FileSystem fs = options.getFileSystem();
+
CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(new RateLimitedOutputStream(fs.create(new Path(file), false, bufferSize, (short) rep, block),
- writeLimiter), compression, conf, acuconf);
- Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler);
+ options.getRateLimiter()), compression, conf, acuconf);
+
+ RFile.Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler);
return writer;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java b/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
index ce49f36..9bd1455 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
@@ -225,7 +225,8 @@ public class MockTableOperationsTest {
fs.delete(tempFile, true);
fs.mkdirs(failures);
fs.mkdirs(tempFile.getParent());
- FileSKVWriter writer = FileOperations.getInstance().openWriter(tempFile.toString(), fs, defaultConf, null, AccumuloConfiguration.getDefaultConfiguration());
+ FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder().forFile(tempFile.toString(), fs, defaultConf)
+ .withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build();
writer.startDefaultLocalityGroup();
List<Pair<Key,Value>> keyVals = new ArrayList<Pair<Key,Value>>();
for (int i = 0; i < 5; i++) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java b/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
index 0fb4bd6..5470722 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
@@ -80,7 +80,7 @@ public class BloomFilterLayerLookupTest {
// get output file name
String suffix = FileOperations.getNewFileExtension(acuconf);
String fname = new File(tempDir.getRoot(), testName + "." + suffix).getAbsolutePath();
- FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, null, acuconf);
+ FileSKVWriter bmfw = FileOperations.getInstance().newWriterBuilder().forFile(fname, fs, conf).withTableConfiguration(acuconf).build();
// write data to file
long t1 = System.currentTimeMillis();
@@ -96,7 +96,7 @@ public class BloomFilterLayerLookupTest {
bmfw.close();
t1 = System.currentTimeMillis();
- FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, null, acuconf);
+ FileSKVIterator bmfr = FileOperations.getInstance().newReaderBuilder().forFile(fname, fs, conf).withTableConfiguration(acuconf).build();
t2 = System.currentTimeMillis();
LOG.debug("Opened " + fname + " in " + (t2 - t1));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java b/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
index e8ccf35..a8e4b7f 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
@@ -51,7 +51,7 @@ public class FileOperationsTest {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();
- writer = fileOperations.openWriter(filename, fs, conf, null, acuconf);
+ writer = fileOperations.newWriterBuilder().forFile(filename, fs, conf).withTableConfiguration(acuconf).build();
writer.close();
} catch (Exception ex) {
caughtException = true;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index cf55e35..e27ce52 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -642,7 +642,8 @@ public class BulkImporter {
String filename = file.toString();
// log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow);
FileSystem fs = vm.getVolumeByPath(file).getFileSystem();
- FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), null, context.getConfiguration());
+ FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder().forFile(filename, fs, fs.getConf())
+ .withTableConfiguration(context.getConfiguration()).seekToBeginning().build();
try {
Text row = startRow;
if (row == null)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index e58a79c..5a4cac1 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -465,7 +465,8 @@ public class Initialize implements KeywordExecutable {
createEntriesForTablet(sorted, tablet);
}
FileSystem fs = volmanager.getVolumeByPath(new Path(fileName)).getFileSystem();
- FileSKVWriter tabletWriter = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), null, AccumuloConfiguration.getDefaultConfiguration());
+ FileSKVWriter tabletWriter = FileOperations.getInstance().newWriterBuilder().forFile(fileName, fs, fs.getConf())
+ .withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build();
tabletWriter.startDefaultLocalityGroup();
for (Entry<Key,Value> entry : sorted.entrySet()) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
index 9291808..491a830 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
@@ -132,7 +132,7 @@ public class FileUtil {
outFiles.add(newMapFile);
FileSystem ns = fs.getVolumeByPath(new Path(newMapFile)).getFileSystem();
- FileSKVWriter writer = new RFileOperations().openWriter(newMapFile.toString(), ns, ns.getConf(), null, acuConf);
+ FileSKVWriter writer = new RFileOperations().newWriterBuilder().forFile(newMapFile.toString(), ns, ns.getConf()).withTableConfiguration(acuConf).build();
writer.startDefaultLocalityGroup();
List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(inFiles.size());
@@ -140,7 +140,7 @@ public class FileUtil {
try {
for (String s : inFiles) {
ns = fs.getVolumeByPath(new Path(s)).getFileSystem();
- reader = FileOperations.getInstance().openIndex(s, ns, ns.getConf(), acuConf);
+ reader = FileOperations.getInstance().newIndexReaderBuilder().forFile(s, ns, ns.getConf()).withTableConfiguration(acuConf).build();
iters.add(reader);
}
@@ -401,10 +401,10 @@ public class FileUtil {
FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
try {
if (useIndex)
- reader = FileOperations.getInstance().openIndex(path.toString(), ns, ns.getConf(), acuConf);
+ reader = FileOperations.getInstance().newIndexReaderBuilder().forFile(path.toString(), ns, ns.getConf()).withTableConfiguration(acuConf).build();
else
- reader = FileOperations.getInstance().openReader(path.toString(), new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false,
- ns, ns.getConf(), null, acuConf);
+ reader = FileOperations.getInstance().newScanReaderBuilder().forFile(path.toString(), ns, ns.getConf()).withTableConfiguration(acuConf)
+ .overRange(new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false).build();
while (reader.hasTop()) {
Key key = reader.getTopKey();
@@ -425,10 +425,10 @@ public class FileUtil {
}
if (useIndex)
- readers.add(FileOperations.getInstance().openIndex(path.toString(), ns, ns.getConf(), acuConf));
+ readers.add(FileOperations.getInstance().newIndexReaderBuilder().forFile(path.toString(), ns, ns.getConf()).withTableConfiguration(acuConf).build());
else
- readers.add(FileOperations.getInstance().openReader(path.toString(), new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false,
- ns, ns.getConf(), null, acuConf));
+ readers.add(FileOperations.getInstance().newScanReaderBuilder().forFile(path.toString(), ns, ns.getConf()).withTableConfiguration(acuConf)
+ .overRange(new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false).build());
}
return numKeys;
@@ -445,7 +445,7 @@ public class FileUtil {
FileSKVIterator reader = null;
FileSystem ns = fs.getVolumeByPath(mapfile.path()).getFileSystem();
try {
- reader = FileOperations.getInstance().openReader(mapfile.toString(), false, ns, ns.getConf(), null, acuConf);
+ reader = FileOperations.getInstance().newReaderBuilder().forFile(mapfile.toString(), ns, ns.getConf()).withTableConfiguration(acuConf).build();
Key firstKey = reader.getFirstKey();
if (firstKey != null) {
@@ -479,7 +479,8 @@ public class FileUtil {
for (FileRef ref : mapFiles) {
Path path = ref.path();
FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
- FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), null, acuConf);
+ FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder().forFile(path.toString(), ns, ns.getConf()).withTableConfiguration(acuConf)
+ .seekToBeginning().build();
try {
if (!reader.hasTop())
@@ -522,7 +523,8 @@ public class FileUtil {
Text row = new Text();
FileSystem ns = fs.getVolumeByPath(mapFile).getFileSystem();
- FileSKVIterator index = FileOperations.getInstance().openIndex(mapFile.toString(), ns, ns.getConf(), acuConf);
+ FileSKVIterator index = FileOperations.getInstance().newIndexReaderBuilder().forFile(mapFile.toString(), ns, ns.getConf()).withTableConfiguration(acuConf)
+ .build();
try {
while (index.hasTop()) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
index 646e400..09ee6e0 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
@@ -113,7 +113,8 @@ public class BulkImporterTest {
EasyMock.replay(context);
String file = "target/testFile.rf";
fs.delete(new Path(file), true);
- FileSKVWriter writer = FileOperations.getInstance().openWriter(file, fs, fs.getConf(), null, context.getConfiguration());
+ FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder().forFile(file, fs, fs.getConf()).withTableConfiguration(context.getConfiguration())
+ .build();
writer.startDefaultLocalityGroup();
Value empty = new Value(new byte[] {});
writer.append(new Key("a", "cf", "cq"), empty);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
index c28d060..699b5a7 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
@@ -316,8 +316,8 @@ public class FileManager {
Path path = new Path(file);
FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
// log.debug("Opening "+file + " path " + path);
- FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), null,
- context.getServerConfigurationFactory().getTableConfiguration(tablet), dataCache, indexCache);
+ FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder().forFile(path.toString(), ns, ns.getConf())
+ .withTableConfiguration(context.getServerConfigurationFactory().getTableConfiguration(tablet)).withBlockCache(dataCache, indexCache).build();
reservedFiles.add(reader);
readersReserved.put(reader, file);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index 5219e33..1451ddb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@ -632,7 +632,8 @@ public class InMemoryMap {
Configuration conf = CachedConfiguration.getInstance();
FileSystem fs = FileSystem.getLocal(conf);
- reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, null, SiteConfiguration.getInstance());
+ reader = new RFileOperations().newReaderBuilder().forFile(memDumpFile, fs, conf).withTableConfiguration(SiteConfiguration.getInstance())
+ .seekToBeginning().build();
if (iflag != null)
reader.setInterruptFlag(iflag);
@@ -804,7 +805,7 @@ public class InMemoryMap {
siteConf = createSampleConfig(siteConf);
}
- FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, null, siteConf);
+ FileSKVWriter out = new RFileOperations().newWriterBuilder().forFile(tmpFile, fs, newConf).withTableConfiguration(siteConf).build();
InterruptibleIterator iter = map.skvIterator(null);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
index 79c0c4f..08bff26 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
@@ -77,7 +77,8 @@ public class MajorCompactionRequest implements Cloneable {
// @TODO ensure these files are always closed?
FileOperations fileFactory = FileOperations.getInstance();
FileSystem ns = volumeManager.getVolumeByPath(ref.path()).getFileSystem();
- FileSKVIterator openReader = fileFactory.openReader(ref.path().toString(), true, ns, ns.getConf(), null, tableConfig);
+ FileSKVIterator openReader = fileFactory.newReaderBuilder().forFile(ref.path().toString(), ns, ns.getConf()).withTableConfiguration(tableConfig)
+ .seekToBeginning().build();
return openReader;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
index bde5be0..79d22ea 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
@@ -197,7 +197,8 @@ public class Compactor implements Callable<CompactionStats> {
try {
FileOperations fileFactory = FileOperations.getInstance();
FileSystem ns = this.fs.getVolumeByPath(outputFilePath).getFileSystem();
- mfw = fileFactory.openWriter(outputFilePathName, ns, ns.getConf(), env.getWriteLimiter(), acuTableConf);
+ mfw = fileFactory.newWriterBuilder().forFile(outputFilePathName, ns, ns.getConf()).withTableConfiguration(acuTableConf)
+ .withRateLimiter(env.getWriteLimiter()).build();
Map<String,Set<ByteSequence>> lGroups;
try {
@@ -285,7 +286,8 @@ public class Compactor implements Callable<CompactionStats> {
FileSystem fs = this.fs.getVolumeByPath(mapFile.path()).getFileSystem();
FileSKVIterator reader;
- reader = fileFactory.openReader(mapFile.path().toString(), false, fs, fs.getConf(), env.getReadLimiter(), acuTableConf);
+ reader = fileFactory.newReaderBuilder().forFile(mapFile.path().toString(), fs, fs.getConf()).withTableConfiguration(acuTableConf)
+ .withRateLimiter(env.getReadLimiter()).build();
readers.add(reader);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index fb47afc..7b59ceb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1584,7 +1584,8 @@ public class Tablet implements TabletCommitter {
for (Entry<FileRef,DataFileValue> entry : allFiles.entrySet()) {
FileRef file = entry.getKey();
FileSystem ns = fs.getVolumeByPath(file.path()).getFileSystem();
- FileSKVIterator openReader = fileFactory.openReader(file.path().toString(), true, ns, ns.getConf(), null, this.getTableConfiguration());
+ FileSKVIterator openReader = fileFactory.newReaderBuilder().forFile(file.path().toString(), ns, ns.getConf())
+ .withTableConfiguration(this.getTableConfiguration()).seekToBeginning().build();
try {
Key first = openReader.getFirstKey();
Key last = openReader.getLastKey();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
index d66863e..3874d95 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
@@ -149,7 +149,8 @@ public class TabletData {
dataFiles.put(ref, dfv);
FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
- FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), null, conf);
+ FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder().forFile(path.toString(), ns, ns.getConf()).withTableConfiguration(conf)
+ .seekToBeginning().build();
long maxTime = -1;
try {
while (reader.hasTop()) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java b/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
index 2f90d7e..018717a 100644
--- a/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
@@ -98,8 +98,9 @@ public class BulkImportMonitoringIT extends ConfigurableMacBase {
fs.mkdirs(bulkFailures);
fs.mkdirs(files);
for (int i = 0; i < 10; i++) {
- FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(), null,
- AccumuloConfiguration.getDefaultConfiguration());
+ FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder()
+ .forFile(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf())
+ .withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build();
writer.startDefaultLocalityGroup();
for (int j = 0x100; j < 0xfff; j += 3) {
writer.append(new Key(Integer.toHexString(j)), new Value(new byte[0]));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java b/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
index 3929dc7..2a05f4e 100644
--- a/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
+++ b/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
@@ -69,7 +69,7 @@ public class CreateRandomRFile {
FileSKVWriter mfw;
try {
FileSystem fs = FileSystem.get(conf);
- mfw = new RFileOperations().openWriter(file, fs, conf, null, AccumuloConfiguration.getDefaultConfiguration());
+ mfw = new RFileOperations().newWriterBuilder().forFile(file, fs, conf).withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build();
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java b/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
index 21b2b78..16a3122 100644
--- a/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
+++ b/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
@@ -58,7 +58,8 @@ public class GenerateSequentialRFile implements Runnable {
final Configuration conf = new Configuration();
Path p = new Path(opts.filePath);
final FileSystem fs = p.getFileSystem(conf);
- FileSKVWriter writer = FileOperations.getInstance().openWriter(opts.filePath, fs, conf, null, DefaultConfiguration.getInstance());
+ FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder().forFile(opts.filePath, fs, conf)
+ .withTableConfiguration(DefaultConfiguration.getInstance()).build();
writer.startDefaultLocalityGroup();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java b/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
index b042d97..c3aba52 100644
--- a/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
@@ -123,8 +123,9 @@ public class GetFileInfoBulkIT extends ConfigurableMacBase {
fs.mkdirs(bulkFailures);
fs.mkdirs(files);
for (int i = 0; i < 100; i++) {
- FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(), null,
- AccumuloConfiguration.getDefaultConfiguration());
+ FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder()
+ .forFile(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf())
+ .withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build();
writer.startDefaultLocalityGroup();
for (int j = 0x100; j < 0xfff; j += 3) {
writer.append(new Key(Integer.toHexString(j)), new Value(new byte[0]));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
index 0f0c292..357a8fb 100644
--- a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
@@ -1338,9 +1338,9 @@ public class ShellServerIT extends SharedMiniClusterBase {
assertTrue(errorsDir.mkdir());
fs.mkdirs(new Path(errorsDir.toString()));
AccumuloConfiguration aconf = AccumuloConfiguration.getDefaultConfiguration();
- FileSKVWriter evenWriter = FileOperations.getInstance().openWriter(even, fs, conf, null, aconf);
+ FileSKVWriter evenWriter = FileOperations.getInstance().newWriterBuilder().forFile(even, fs, conf).withTableConfiguration(aconf).build();
evenWriter.startDefaultLocalityGroup();
- FileSKVWriter oddWriter = FileOperations.getInstance().openWriter(odd, fs, conf, null, aconf);
+ FileSKVWriter oddWriter = FileOperations.getInstance().newWriterBuilder().forFile(odd, fs, conf).withTableConfiguration(aconf).build();
oddWriter.startDefaultLocalityGroup();
long timestamp = System.currentTimeMillis();
Text cf = new Text("cf");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/test/src/main/java/org/apache/accumulo/test/TestIngest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
index 8e196c4..7d19253 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
@@ -218,8 +218,8 @@ public class TestIngest {
if (opts.outputFile != null) {
Configuration conf = CachedConfiguration.getInstance();
- writer = FileOperations.getInstance()
- .openWriter(opts.outputFile + "." + RFile.EXTENSION, fs, conf, null, AccumuloConfiguration.getDefaultConfiguration());
+ writer = FileOperations.getInstance().newWriterBuilder().forFile(opts.outputFile + "." + RFile.EXTENSION, fs, conf)
+ .withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build();
writer.startDefaultLocalityGroup();
} else {
bw = connector.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
index 9c2ab55..bf248e6 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
@@ -74,17 +74,20 @@ public class BulkFileIT extends AccumuloClusterHarness {
fs.delete(new Path(dir), true);
- FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + RFile.EXTENSION, fs, conf, null, aconf);
+ FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder().forFile(dir + "/f1." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf)
+ .build();
writer1.startDefaultLocalityGroup();
writeData(writer1, 0, 333);
writer1.close();
- FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + RFile.EXTENSION, fs, conf, null, aconf);
+ FileSKVWriter writer2 = FileOperations.getInstance().newWriterBuilder().forFile(dir + "/f2." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf)
+ .build();
writer2.startDefaultLocalityGroup();
writeData(writer2, 334, 999);
writer2.close();
- FileSKVWriter writer3 = FileOperations.getInstance().openWriter(dir + "/f3." + RFile.EXTENSION, fs, conf, null, aconf);
+ FileSKVWriter writer3 = FileOperations.getInstance().newWriterBuilder().forFile(dir + "/f3." + RFile.EXTENSION, fs, conf).withTableConfiguration(aconf)
+ .build();
writer3.startDefaultLocalityGroup();
writeData(writer3, 1000, 1999);
writer3.close();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
index 93ebdb9..44eee4c 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
@@ -193,8 +193,8 @@ public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
Configuration conf = CachedConfiguration.getInstance();
DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
- FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, null, acuconf)
- .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
+ FileSKVIterator sample = RFileOperations.getInstance().newReaderBuilder().forFile(files[0].toString(), FileSystem.get(conf), conf)
+ .withTableConfiguration(acuconf).build().getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
assertNotNull(sample);
} else {
assertEquals(0, files.length);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
index 2b172c0..2e03658 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
@@ -205,8 +205,8 @@ public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
Configuration conf = CachedConfiguration.getInstance();
DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
- FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, null, acuconf)
- .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
+ FileSKVIterator sample = RFileOperations.getInstance().newReaderBuilder().forFile(files[0].toString(), FileSystem.get(conf), conf)
+ .withTableConfiguration(acuconf).build().getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
assertNotNull(sample);
} else {
assertEquals(0, files.length);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java b/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
index 9db6dd1..a74cb6b 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
@@ -90,8 +90,8 @@ public class FastBulkImportIT extends ConfigurableMacBase {
fs.mkdirs(bulkFailures);
fs.mkdirs(files);
for (int i = 0; i < 100; i++) {
- FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(), null,
- AccumuloConfiguration.getDefaultConfiguration());
+ FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder().forFile(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf())
+ .withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build();
writer.startDefaultLocalityGroup();
for (int j = 0x100; j < 0xfff; j += 3) {
writer.append(new Key(Integer.toHexString(j)), new Value(new byte[0]));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
index fac8c16..69cf23c 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
@@ -447,7 +447,8 @@ public class CollectTabletStats {
for (FileRef file : files) {
FileSystem ns = fs.getVolumeByPath(file.path()).getFileSystem();
- FileSKVIterator reader = FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), null, aconf);
+ FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder().forFile(file.path().toString(), ns, ns.getConf()).withTableConfiguration(aconf)
+ .build();
Range range = new Range(ke.getPrevEndRow(), false, ke.getEndRow(), true);
reader.seek(range, columnSet, columnSet.size() == 0 ? false : true);
while (reader.hasTop() && !range.afterEndKey(reader.getTopKey())) {
@@ -477,7 +478,8 @@ public class CollectTabletStats {
for (FileRef file : files) {
FileSystem ns = fs.getVolumeByPath(file.path()).getFileSystem();
- readers.add(FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), null, aconf.getConfiguration()));
+ readers.add(FileOperations.getInstance().newReaderBuilder().forFile(file.path().toString(), ns, ns.getConf())
+ .withTableConfiguration(aconf.getConfiguration()).build());
}
List<IterInfo> emptyIterinfo = Collections.emptyList();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4a79d586/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java b/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
index 3ac5b51..a972443 100644
--- a/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
@@ -2077,7 +2077,8 @@ public abstract class SimpleProxyBase extends SharedMiniClusterBase {
// Write an RFile
String filename = dir + "/bulk/import/rfile.rf";
- FileSKVWriter writer = FileOperations.getInstance().openWriter(filename, fs, fs.getConf(), null, DefaultConfiguration.getInstance());
+ FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder().forFile(filename, fs, fs.getConf())
+ .withTableConfiguration(DefaultConfiguration.getInstance()).build();
writer.startDefaultLocalityGroup();
writer.append(new org.apache.accumulo.core.data.Key(new Text("a"), new Text("b"), new Text("c")), new Value("value".getBytes(UTF_8)));
writer.close();