You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2018/08/16 14:43:10 UTC
[accumulo] branch master updated: Greatly simplify FileOperations
(#604)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 097537b Greatly simplify FileOperations (#604)
097537b is described below
commit 097537b16110b1c6e339a48efb65b8539ede9e24
Author: Mike Miller <mm...@apache.org>
AuthorDate: Thu Aug 16 10:43:07 2018 -0400
Greatly simplify FileOperations (#604)
---
.../core/client/rfile/RFileWriterBuilder.java | 14 +-
.../accumulo/core/file/DispatchingFileFactory.java | 32 +-
.../apache/accumulo/core/file/FileOperations.java | 616 +++++++++------------
.../accumulo/core/file/map/MapFileOperations.java | 10 +-
.../accumulo/core/file/rfile/RFileOperations.java | 12 +-
.../accumulo/core/client/rfile/RFileTest.java | 4 +-
6 files changed, 280 insertions(+), 408 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
index 20636f8..5a73c1a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java
@@ -104,16 +104,12 @@ class RFileWriterBuilder implements RFile.OutputArguments, RFile.WriterFSOptions
} else {
fsdo = new FSDataOutputStream(out.getOutputStream(), new FileSystem.Statistics("foo"));
}
- return new RFileWriter(
- fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf())
- .withTableConfiguration(acuconf).setAccumuloStartEnabled(false).build(),
- visCacheSize);
+ return new RFileWriter(fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf())
+ .withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize);
} else {
- return new RFileWriter(
- fileops.newWriterBuilder()
- .forFile(out.path.toString(), out.getFileSystem(), out.getConf())
- .withTableConfiguration(acuconf).setAccumuloStartEnabled(false).build(),
- visCacheSize);
+ return new RFileWriter(fileops.newWriterBuilder()
+ .forFile(out.path.toString(), out.getFileSystem(), out.getConf())
+ .withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize);
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
index 81348fc..6c65e42 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path;
class DispatchingFileFactory extends FileOperations {
- private FileOperations findFileFactory(FileAccessOperation<?> options) {
+ private FileOperations findFileFactory(FileOptions options) {
String file = options.getFilename();
Path p = new Path(file);
@@ -55,28 +55,15 @@ class DispatchingFileFactory extends FileOperations {
}
}
- /**
- * If the table configuration disallows caching, rewrite the options object to not pass the
- * caches.
- */
- private static <T extends FileReaderOperation<T>> T selectivelyDisableCaches(T input) {
- if (!input.getTableConfiguration().getBoolean(Property.TABLE_INDEXCACHE_ENABLED)) {
- input = input.withIndexCache(null);
- }
- if (!input.getTableConfiguration().getBoolean(Property.TABLE_BLOCKCACHE_ENABLED)) {
- input = input.withDataCache(null);
- }
- return input;
- }
-
@Override
- protected long getFileSize(GetFileSizeOperation options) throws IOException {
+ protected long getFileSize(FileOptions options) throws IOException {
return findFileFactory(options).getFileSize(options);
}
@Override
- protected FileSKVWriter openWriter(OpenWriterOperation options) throws IOException {
- FileSKVWriter writer = findFileFactory(options).openWriter(options);
+ protected FileSKVWriter openWriter(FileOptions options) throws IOException {
+ FileOperations fileOps = new RFileOperations();
+ FileSKVWriter writer = fileOps.openWriter(options);
if (options.getTableConfiguration().getBoolean(Property.TABLE_BLOOM_ENABLED)) {
writer = new BloomFilterLayer.Writer(writer, options.getTableConfiguration(),
options.isAccumuloStartEnabled());
@@ -87,14 +74,12 @@ class DispatchingFileFactory extends FileOperations {
}
@Override
- protected FileSKVIterator openIndex(OpenIndexOperation options) throws IOException {
- options = selectivelyDisableCaches(options);
+ protected FileSKVIterator openIndex(FileOptions options) throws IOException {
return findFileFactory(options).openIndex(options);
}
@Override
- protected FileSKVIterator openReader(OpenReaderOperation options) throws IOException {
- options = selectivelyDisableCaches(options);
+ protected FileSKVIterator openReader(FileOptions options) throws IOException {
FileSKVIterator iter = findFileFactory(options).openReader(options);
if (options.getTableConfiguration().getBoolean(Property.TABLE_BLOOM_ENABLED)) {
return new BloomFilterLayer.Reader(iter, options.getTableConfiguration());
@@ -104,8 +89,7 @@ class DispatchingFileFactory extends FileOperations {
}
@Override
- protected FileSKVIterator openScanReader(OpenScanReaderOperation options) throws IOException {
- options = selectivelyDisableCaches(options);
+ protected FileSKVIterator openScanReader(FileOptions options) throws IOException {
return findFileFactory(options).openScanReader(options);
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index caccea7..051f774 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -58,35 +58,21 @@ public abstract class FileOperations {
// Abstract methods (to be implemented by subclasses)
//
- protected abstract long getFileSize(GetFileSizeOperation options) throws IOException;
+ protected abstract long getFileSize(FileOptions options) throws IOException;
- protected abstract FileSKVWriter openWriter(OpenWriterOperation options) throws IOException;
+ protected abstract FileSKVWriter openWriter(FileOptions options) throws IOException;
- protected abstract FileSKVIterator openIndex(OpenIndexOperation options) throws IOException;
+ protected abstract FileSKVIterator openIndex(FileOptions options) throws IOException;
- protected abstract FileSKVIterator openScanReader(OpenScanReaderOperation options)
- throws IOException;
+ protected abstract FileSKVIterator openScanReader(FileOptions options) throws IOException;
- protected abstract FileSKVIterator openReader(OpenReaderOperation options) throws IOException;
+ protected abstract FileSKVIterator openReader(FileOptions options) throws IOException;
//
// File operations
//
/**
- * Construct an operation object allowing one to query the size of a file. <br>
- * Syntax:
- *
- * <pre>
- * long size = fileOperations.getFileSize().forFile(filename, fileSystem, fsConfiguration)
- * .withTableConfiguration(tableConf).execute();
- * </pre>
- */
- public NeedsFile<GetFileSizeOperationBuilder> getFileSize() {
- return new GetFileSizeOperation();
- }
-
- /**
* Construct an operation object allowing one to create a writer for a file. <br>
* Syntax:
*
@@ -99,8 +85,8 @@ public abstract class FileOperations {
* .build();
* </pre>
*/
- public NeedsFileOrOuputStream<OpenWriterOperationBuilder> newWriterBuilder() {
- return new OpenWriterOperation();
+ public WriterBuilder newWriterBuilder() {
+ return new WriterBuilder();
}
/**
@@ -116,8 +102,8 @@ public abstract class FileOperations {
* .build();
* </pre>
*/
- public NeedsFile<OpenIndexOperationBuilder> newIndexReaderBuilder() {
- return new OpenIndexOperation();
+ public IndexReaderBuilder newIndexReaderBuilder() {
+ return new IndexReaderBuilder();
}
/**
@@ -139,11 +125,8 @@ public abstract class FileOperations {
* </pre>
*/
@SuppressWarnings("unchecked")
- public NeedsFile<NeedsRange<OpenScanReaderOperationBuilder>> newScanReaderBuilder() {
- // @formatter:off
- return (NeedsFile<NeedsRange<OpenScanReaderOperationBuilder>>)
- (NeedsFile<?>) new OpenScanReaderOperation();
- // @formatter:on
+ public ScanReaderBuilder newScanReaderBuilder() {
+ return new ScanReaderBuilder();
}
/**
@@ -162,68 +145,58 @@ public abstract class FileOperations {
* .build();
* </pre>
*/
- public NeedsFile<OpenReaderOperationBuilder> newReaderBuilder() {
- return new OpenReaderOperation();
+ public ReaderBuilder newReaderBuilder() {
+ return new ReaderBuilder();
}
- //
- // Domain specific embedded language for execution of operations.
- //
- // Here, for each ...Operation class which is a POJO holding a group of parameters,
- // we have a parallel ...OperationBuilder interface which only exposes the setters / execute
- // methods.
- // This allows us to expose only the setter/execute methods to upper layers, while
- // allowing lower layers the freedom to both get and set.
- //
-
- /**
- * Options common to all FileOperations.
- */
- // @formatter:off
- protected static class
- FileAccessOperation<SubclassType extends FileAccessOperation<SubclassType>> {
- // @formatter:on
- private AccumuloConfiguration tableConfiguration;
-
- private String filename;
- private FileSystem fs;
- private Configuration fsConf;
-
- /** Specify the table configuration defining access to this file. */
- @SuppressWarnings("unchecked")
- public SubclassType withTableConfiguration(AccumuloConfiguration tableConfiguration) {
+ public class FileOptions {
+ // objects used by all
+ public final AccumuloConfiguration tableConfiguration;
+ public final String filename;
+ public final FileSystem fs;
+ public final Configuration fsConf;
+ public final RateLimiter rateLimiter;
+ // writer only objects
+ public final String compression;
+ public final FSDataOutputStream outputStream;
+ public final boolean enableAccumuloStart;
+ // reader only objects
+ public final BlockCache dataCache;
+ public final BlockCache indexCache;
+ public final Cache<String,Long> fileLenCache;
+ public final boolean seekToBeginning;
+ public final CryptoService cryptoService;
+ // scan reader only objects
+ public final Range range;
+ public final Set<ByteSequence> columnFamilies;
+ public final boolean inclusive;
+
+ public FileOptions(AccumuloConfiguration tableConfiguration, String filename, FileSystem fs,
+ Configuration fsConf, RateLimiter rateLimiter, String compression,
+ FSDataOutputStream outputStream, boolean enableAccumuloStart, BlockCache dataCache,
+ BlockCache indexCache, Cache<String,Long> fileLenCache, boolean seekToBeginning,
+ CryptoService cryptoService, Range range, Set<ByteSequence> columnFamilies,
+ boolean inclusive) {
this.tableConfiguration = tableConfiguration;
- return (SubclassType) this;
- }
-
- /** Specify the file this operation should apply to. */
- @SuppressWarnings("unchecked")
- public SubclassType forFile(String filename, FileSystem fs, Configuration fsConf) {
this.filename = filename;
this.fs = fs;
this.fsConf = fsConf;
- return (SubclassType) this;
- }
-
- /** Specify the file this operation should apply to. */
- @SuppressWarnings("unchecked")
- public SubclassType forFile(String filename) {
- this.filename = filename;
- return (SubclassType) this;
- }
-
- /**
- * Specify the filesystem which this operation should apply to, along with its configuration.
- */
- @SuppressWarnings("unchecked")
- public SubclassType inFileSystem(FileSystem fs, Configuration fsConf) {
- this.fs = fs;
- this.fsConf = fsConf;
- return (SubclassType) this;
+ this.rateLimiter = rateLimiter;
+ this.compression = compression;
+ this.outputStream = outputStream;
+ this.enableAccumuloStart = enableAccumuloStart;
+ this.dataCache = dataCache;
+ this.indexCache = indexCache;
+ this.fileLenCache = fileLenCache;
+ this.seekToBeginning = seekToBeginning;
+ this.cryptoService = cryptoService;
+ this.range = range;
+ this.columnFamilies = columnFamilies;
+ this.inclusive = inclusive;
}
- protected void setFilename(String filename) {
- this.filename = filename;
+ public AccumuloConfiguration getTableConfiguration() {
+ return tableConfiguration;
}
public String getFilename() {
@@ -234,283 +207,314 @@ public abstract class FileOperations {
return fs;
}
- protected void setConfiguration(Configuration fsConf) {
- this.fsConf = fsConf;
- }
-
public Configuration getConfiguration() {
return fsConf;
}
- public AccumuloConfiguration getTableConfiguration() {
- return tableConfiguration;
+ public RateLimiter getRateLimiter() {
+ return rateLimiter;
}
- /** Check for null parameters. */
- protected void validate() {
- Objects.requireNonNull(getFilename());
- Objects.requireNonNull(getFileSystem());
- Objects.requireNonNull(getConfiguration());
- Objects.requireNonNull(getTableConfiguration());
+ public String getCompression() {
+ return compression;
}
- }
- /** Builder interface parallel to {@link FileAccessOperation}. */
- protected interface FileAccessOperationBuilder<SubbuilderType> extends NeedsFile<SubbuilderType>,
- NeedsFileSystem<SubbuilderType>, NeedsTableConfiguration<SubbuilderType> {
- // no optional/generic methods.
- }
+ public FSDataOutputStream getOutputStream() {
+ return outputStream;
+ }
- /**
- * Operation object for performing {@code getFileSize()} operations.
- */
- protected class GetFileSizeOperation extends FileAccessOperation<GetFileSizeOperation>
- implements GetFileSizeOperationBuilder {
- /** Return the size of the file. */
- @Override
- public long execute() throws IOException {
- validate();
- return getFileSize(this);
+ public boolean isAccumuloStartEnabled() {
+ return enableAccumuloStart;
}
- }
- /**
- * Builder interface for {@link GetFileSizeOperation}, allowing execution of {@code getFileSize()}
- * operations.
- */
- public interface GetFileSizeOperationBuilder
- extends FileAccessOperationBuilder<GetFileSizeOperationBuilder> {
- /** Return the size of the file. */
- long execute() throws IOException;
+ public BlockCache getDataCache() {
+ return dataCache;
+ }
+
+ public BlockCache getIndexCache() {
+ return indexCache;
+ }
+
+ public Cache<String,Long> getFileLenCache() {
+ return fileLenCache;
+ }
+
+ public boolean isSeekToBeginning() {
+ return seekToBeginning;
+ }
+
+ public CryptoService getCryptoService() {
+ return cryptoService;
+ }
+
+ public Range getRange() {
+ return range;
+ }
+
+ public Set<ByteSequence> getColumnFamilies() {
+ return columnFamilies;
+ }
+
+ public boolean isRangeInclusive() {
+ return inclusive;
+ }
}
/**
- * Options common to all {@code FileOperation}s which perform reading or writing.
+ * Helper class extended by both writers and readers.
*/
- protected static class FileIOOperation<SubclassType extends FileIOOperation<SubclassType>>
- extends FileAccessOperation<SubclassType> {
- private RateLimiter rateLimiter;
+ public class FileHelper {
+ protected AccumuloConfiguration tableConfiguration;
+ protected String filename;
+ protected FileSystem fs;
+ protected Configuration fsConf;
+ protected RateLimiter rateLimiter;
+
+ protected FileHelper fs(FileSystem fs) {
+ Objects.requireNonNull(fs);
+ this.fs = fs;
+ return this;
+ }
+
+ protected FileHelper fsConf(Configuration fsConf) {
+ Objects.requireNonNull(fsConf);
+ this.fsConf = fsConf;
+ return this;
+ }
- /** Specify a rate limiter for this operation. */
- @SuppressWarnings("unchecked")
- public SubclassType withRateLimiter(RateLimiter rateLimiter) {
+ protected FileHelper filename(String filename) {
+ Objects.requireNonNull(filename);
+ this.filename = filename;
+ return this;
+ }
+
+ protected FileHelper tableConfiguration(AccumuloConfiguration tableConfiguration) {
+ Objects.requireNonNull(tableConfiguration);
+ this.tableConfiguration = tableConfiguration;
+ return this;
+ }
+
+ protected FileHelper rateLimiter(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
- return (SubclassType) this;
+ return this;
}
- public RateLimiter getRateLimiter() {
- return rateLimiter;
+ protected FileOptions toWriterBuilderOptions(String compression,
+ FSDataOutputStream outputStream, boolean startEnabled) {
+ return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, compression,
+ outputStream, startEnabled, null, null, null, false, null, null, null, true);
+ }
+
+ protected FileOptions toReaderBuilderOptions(BlockCache dataCache, BlockCache indexCache,
+ Cache<String,Long> fileLenCache, boolean seekToBeginning, CryptoService cryptoService) {
+ return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null,
+ false, dataCache, indexCache, fileLenCache, seekToBeginning, cryptoService, null, null,
+ true);
+ }
+
+ protected FileOptions toIndexReaderBuilderOptions() {
+ return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null,
+ false, null, null, null, false, null, null, null, true);
}
- }
- /** Builder interface parallel to {@link FileIOOperation}. */
- protected interface FileIOOperationBuilder<SubbuilderType>
- extends FileAccessOperationBuilder<SubbuilderType> {
- /** Specify a rate limiter for this operation. */
- SubbuilderType withRateLimiter(RateLimiter rateLimiter);
+ protected FileOptions toScanReaderBuilderOptions(Range range, Set<ByteSequence> columnFamilies,
+ boolean inclusive) {
+ return new FileOptions(tableConfiguration, filename, fs, fsConf, rateLimiter, null, null,
+ false, null, null, null, false, null, range, columnFamilies, inclusive);
+ }
}
/**
* Operation object for constructing a writer.
*/
- protected class OpenWriterOperation extends FileIOOperation<OpenWriterOperation>
- implements OpenWriterOperationBuilder, NeedsFileOrOuputStream<OpenWriterOperationBuilder> {
+ public class WriterBuilder extends FileHelper implements WriterTableConfiguration {
private String compression;
private FSDataOutputStream outputStream;
private boolean enableAccumuloStart = true;
- @Override
- public NeedsTableConfiguration<OpenWriterOperationBuilder> forOutputStream(String extenstion,
+ public WriterTableConfiguration forOutputStream(String extension,
FSDataOutputStream outputStream, Configuration fsConf) {
this.outputStream = outputStream;
- setConfiguration(fsConf);
- setFilename("foo" + extenstion);
+ filename("foo" + extension).fsConf(fsConf);
return this;
}
- public boolean isAccumuloStartEnabled() {
- return enableAccumuloStart;
- }
-
- @Override
- public OpenWriterOperation setAccumuloStartEnabled(boolean enableAccumuloStart) {
- this.enableAccumuloStart = enableAccumuloStart;
+ public WriterTableConfiguration forFile(String filename, FileSystem fs, Configuration fsConf) {
+ filename(filename).fs(fs).fsConf(fsConf);
return this;
}
- @Override
- public OpenWriterOperation withCompression(String compression) {
- this.compression = compression;
+ public WriterBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration) {
+ tableConfiguration(tableConfiguration);
return this;
}
- public String getCompression() {
- return compression;
+ public WriterBuilder withStartDisabled() {
+ this.enableAccumuloStart = false;
+ return this;
}
- public FSDataOutputStream getOutputStream() {
- return outputStream;
+ public WriterBuilder withCompression(String compression) {
+ this.compression = compression;
+ return this;
}
- @Override
- protected void validate() {
- if (outputStream == null) {
- super.validate();
- } else {
- Objects.requireNonNull(getConfiguration());
- Objects.requireNonNull(getTableConfiguration());
- }
+ public WriterBuilder withRateLimiter(RateLimiter rateLimiter) {
+ rateLimiter(rateLimiter);
+ return this;
}
- @Override
public FileSKVWriter build() throws IOException {
- validate();
- return openWriter(this);
+ return openWriter(toWriterBuilderOptions(compression, outputStream, enableAccumuloStart));
}
}
- /** Builder interface parallel to {@link OpenWriterOperation}. */
- public interface OpenWriterOperationBuilder
- extends FileIOOperationBuilder<OpenWriterOperationBuilder> {
- /** Set the compression type. */
- OpenWriterOperationBuilder withCompression(String compression);
-
- /**
- * Classes may be instantiated as part of a write operation. For example if BloomFilters,
- * Samplers, or Summarizers are used then classes are loaded. When running in a tserver,
- * Accumulo start should be used to load classes. When running in a client process, Accumulo
- * start should not be used. This method makes it possible to specify if Accumulo Start should
- * be used to load classes. Calling this method is optional and the default is true.
- */
- OpenWriterOperationBuilder setAccumuloStartEnabled(boolean enableAccumuloStart);
-
- /** Construct the writer. */
- FileSKVWriter build() throws IOException;
+ public interface WriterTableConfiguration {
+ public WriterBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration);
}
/**
* Options common to all {@code FileOperations} which perform reads.
*/
- protected static class FileReaderOperation<SubclassType extends FileReaderOperation<SubclassType>>
- extends FileIOOperation<SubclassType> {
+ public class ReaderBuilder extends FileHelper implements ReaderTableConfiguration {
private BlockCache dataCache;
private BlockCache indexCache;
private Cache<String,Long> fileLenCache;
+ private boolean seekToBeginning = false;
private CryptoService cryptoService;
+ public ReaderTableConfiguration forFile(String filename, FileSystem fs, Configuration fsConf) {
+ filename(filename).fs(fs).fsConf(fsConf);
+ return this;
+ }
+
+ public ReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration) {
+ tableConfiguration(tableConfiguration);
+ return this;
+ }
+
/**
* (Optional) Set the block cache pair to be used to optimize reads within the constructed
* reader.
*/
- @SuppressWarnings("unchecked")
- public SubclassType withBlockCache(BlockCache dataCache, BlockCache indexCache) {
+ public ReaderBuilder withBlockCache(BlockCache dataCache, BlockCache indexCache) {
this.dataCache = dataCache;
this.indexCache = indexCache;
- return (SubclassType) this;
+ return this;
}
/** (Optional) set the data cache to be used to optimize reads within the constructed reader. */
- @SuppressWarnings("unchecked")
- public SubclassType withDataCache(BlockCache dataCache) {
+ public ReaderBuilder withDataCache(BlockCache dataCache) {
this.dataCache = dataCache;
- return (SubclassType) this;
+ return this;
}
/**
* (Optional) set the index cache to be used to optimize reads within the constructed reader.
*/
- @SuppressWarnings("unchecked")
- public SubclassType withIndexCache(BlockCache indexCache) {
+ public ReaderBuilder withIndexCache(BlockCache indexCache) {
this.indexCache = indexCache;
- return (SubclassType) this;
+ return this;
}
- @SuppressWarnings("unchecked")
- public SubclassType withFileLenCache(Cache<String,Long> fileLenCache) {
+ public ReaderBuilder withFileLenCache(Cache<String,Long> fileLenCache) {
this.fileLenCache = fileLenCache;
- return (SubclassType) this;
+ return this;
}
- public SubclassType withCryptoService(CryptoService cryptoService) {
+ public ReaderBuilder withCryptoService(CryptoService cryptoService) {
this.cryptoService = cryptoService;
- return (SubclassType) this;
+ return this;
}
- public BlockCache getDataCache() {
- return dataCache;
+ public ReaderBuilder withRateLimiter(RateLimiter rateLimiter) {
+ rateLimiter(rateLimiter);
+ return this;
}
- public BlockCache getIndexCache() {
- return indexCache;
+ /**
+ * Seek the constructed iterator to the beginning of its domain before returning. Equivalent to
+ * {@code seekToBeginning(true)}.
+ */
+ public ReaderBuilder seekToBeginning() {
+ seekToBeginning(true);
+ return this;
}
- public Cache<String,Long> getFileLenCache() {
- return fileLenCache;
+ /** If true, seek the constructed iterator to the beginning of its domain before returning. */
+ public ReaderBuilder seekToBeginning(boolean seekToBeginning) {
+ this.seekToBeginning = seekToBeginning;
+ return this;
}
- public CryptoService getCryptoService() {
- return cryptoService;
+ /** Execute the operation, constructing the specified file reader. */
+ public FileSKVIterator build() throws IOException {
+ /**
+ * If the table configuration disallows caching, rewrite the options object to not pass the
+ * caches.
+ */
+ if (!tableConfiguration.getBoolean(Property.TABLE_INDEXCACHE_ENABLED)) {
+ withIndexCache(null);
+ }
+ if (!tableConfiguration.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED)) {
+ withDataCache(null);
+ }
+ return openReader(toReaderBuilderOptions(dataCache, indexCache, fileLenCache, seekToBeginning,
+ cryptoService));
}
}
- /** Builder interface parallel to {@link FileReaderOperation}. */
- protected interface FileReaderOperationBuilder<SubbuilderType>
- extends FileIOOperationBuilder<SubbuilderType> {
- /**
- * (Optional) Set the block cache pair to be used to optimize reads within the constructed
- * reader.
- */
- SubbuilderType withBlockCache(BlockCache dataCache, BlockCache indexCache);
-
- /** (Optional) set the data cache to be used to optimize reads within the constructed reader. */
- SubbuilderType withDataCache(BlockCache dataCache);
-
- /**
- * (Optional) set the index cache to be used to optimize reads within the constructed reader.
- */
- SubbuilderType withIndexCache(BlockCache indexCache);
-
- /**
- * (Optional) set the file len cache to be used to optimize reads within the constructed reader.
- */
- SubbuilderType withFileLenCache(Cache<String,Long> fileLenCache);
-
- /**
- * (Optional) set the crypto service to be used within the constructed reader.
- */
- SubbuilderType withCryptoService(CryptoService cryptoService);
+ public interface ReaderTableConfiguration {
+ ReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration);
}
/**
* Operation object for opening an index.
*/
- protected class OpenIndexOperation extends FileReaderOperation<OpenIndexOperation>
- implements OpenIndexOperationBuilder {
- @Override
+ public class IndexReaderBuilder extends FileHelper implements IndexReaderTableConfiguration {
+
+ public IndexReaderTableConfiguration forFile(String filename, FileSystem fs,
+ Configuration fsConf) {
+ filename(filename).fs(fs).fsConf(fsConf);
+ return this;
+ }
+
+ public IndexReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration) {
+ tableConfiguration(tableConfiguration);
+ return this;
+ }
+
public FileSKVIterator build() throws IOException {
- validate();
- return openIndex(this);
+ return openIndex(toIndexReaderBuilderOptions());
}
}
- /** Builder interface parallel to {@link OpenIndexOperation}. */
- public interface OpenIndexOperationBuilder
- extends FileReaderOperationBuilder<OpenIndexOperationBuilder> {
- /** Construct the reader. */
- FileSKVIterator build() throws IOException;
+ public interface IndexReaderTableConfiguration {
+ IndexReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration);
}
/** Operation object for opening a scan reader. */
- protected class OpenScanReaderOperation extends FileReaderOperation<OpenScanReaderOperation>
- implements OpenScanReaderOperationBuilder {
+ public class ScanReaderBuilder extends FileHelper implements ScanReaderTableConfiguration {
private Range range;
private Set<ByteSequence> columnFamilies;
private boolean inclusive;
+ public ScanReaderTableConfiguration forFile(String filename, FileSystem fs,
+ Configuration fsConf) {
+ filename(filename).fs(fs).fsConf(fsConf);
+ return this;
+ }
+
+ public ScanReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration) {
+ tableConfiguration(tableConfiguration);
+ return this;
+ }
+
/** Set the range over which the constructed iterator will search. */
- @Override
- public OpenScanReaderOperation overRange(Range range, Set<ByteSequence> columnFamilies,
+ public ScanReaderBuilder overRange(Range range, Set<ByteSequence> columnFamilies,
boolean inclusive) {
+ Objects.requireNonNull(range);
+ Objects.requireNonNull(columnFamilies);
this.range = range;
this.columnFamilies = columnFamilies;
this.inclusive = inclusive;
@@ -527,125 +531,13 @@ public abstract class FileOperations {
return columnFamilies;
}
- public boolean isRangeInclusive() {
- return inclusive;
- }
-
- @Override
- protected void validate() {
- super.validate();
- Objects.requireNonNull(range);
- Objects.requireNonNull(columnFamilies);
- }
-
- /** Execute the operation, constructing a scan iterator. */
- @Override
- public FileSKVIterator build() throws IOException {
- validate();
- return openScanReader(this);
- }
- }
-
- /** Builder interface parallel to {@link OpenScanReaderOperation}. */
- public interface OpenScanReaderOperationBuilder
- extends FileReaderOperationBuilder<OpenScanReaderOperationBuilder>,
- NeedsRange<OpenScanReaderOperationBuilder> {
/** Execute the operation, constructing a scan iterator. */
- FileSKVIterator build() throws IOException;
- }
-
- /** Operation object for opening a full reader. */
- protected class OpenReaderOperation extends FileReaderOperation<OpenReaderOperation>
- implements OpenReaderOperationBuilder {
- private boolean seekToBeginning = false;
-
- /**
- * Seek the constructed iterator to the beginning of its domain before returning. Equivalent to
- * {@code seekToBeginning(true)}.
- */
- @Override
- public OpenReaderOperation seekToBeginning() {
- return seekToBeginning(true);
- }
-
- /** If true, seek the constructed iterator to the beginning of its domain before returning. */
- @Override
- public OpenReaderOperation seekToBeginning(boolean seekToBeginning) {
- this.seekToBeginning = seekToBeginning;
- return this;
- }
-
- public boolean isSeekToBeginning() {
- return seekToBeginning;
- }
-
- /** Execute the operation, constructing the specified file reader. */
- @Override
public FileSKVIterator build() throws IOException {
- validate();
- return openReader(this);
+ return openScanReader(toScanReaderBuilderOptions(range, columnFamilies, inclusive));
}
}
- /** Builder parallel to {@link OpenReaderOperation}. */
- public interface OpenReaderOperationBuilder
- extends FileReaderOperationBuilder<OpenReaderOperationBuilder> {
- /**
- * Seek the constructed iterator to the beginning of its domain before returning. Equivalent to
- * {@code seekToBeginning(true)}.
- */
- OpenReaderOperationBuilder seekToBeginning();
-
- /** If true, seek the constructed iterator to the beginning of its domain before returning. */
- OpenReaderOperationBuilder seekToBeginning(boolean seekToBeginning);
-
- /** Execute the operation, constructing the specified file reader. */
- FileSKVIterator build() throws IOException;
- }
-
- /**
- * Type wrapper to ensure that {@code forFile(...)} is called before other methods.
- */
- public interface NeedsFile<ReturnType> {
- /** Specify the file this operation should apply to. */
- NeedsTableConfiguration<ReturnType> forFile(String filename, FileSystem fs,
- Configuration fsConf);
-
- /** Specify the file this operation should apply to. */
- NeedsFileSystem<ReturnType> forFile(String filename);
- }
-
- public interface NeedsFileOrOuputStream<ReturnType> extends NeedsFile<ReturnType> {
- /** Specify the file this operation should apply to. */
- NeedsTableConfiguration<ReturnType> forOutputStream(String extenstion, FSDataOutputStream out,
- Configuration fsConf);
+ public interface ScanReaderTableConfiguration {
+ ScanReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration);
}
-
- /**
- * Type wrapper to ensure that {@code inFileSystem(...)} is called before other methods.
- */
- public interface NeedsFileSystem<ReturnType> {
- /**
- * Specify the {@link FileSystem} that this operation operates on, along with an alternate
- * configuration.
- */
- NeedsTableConfiguration<ReturnType> inFileSystem(FileSystem fs, Configuration fsConf);
- }
-
- /**
- * Type wrapper to ensure that {@code withTableConfiguration(...)} is called before other methods.
- */
- public interface NeedsTableConfiguration<ReturnType> {
- /** Specify the table configuration defining access to this file. */
- ReturnType withTableConfiguration(AccumuloConfiguration tableConfiguration);
- }
-
- /**
- * Type wrapper to ensure that {@code overRange(...)} is called before other methods.
- */
- public interface NeedsRange<ReturnType> {
- /** Set the range over which the constructed iterator will search. */
- ReturnType overRange(Range range, Set<ByteSequence> columnFamilies, boolean inclusive);
- }
-
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
index 9099b0a..7198efb 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
@@ -138,7 +138,7 @@ public class MapFileOperations extends FileOperations {
}
@Override
- protected FileSKVIterator openReader(OpenReaderOperation options) throws IOException {
+ protected FileSKVIterator openReader(FileOptions options) throws IOException {
FileSKVIterator iter = new RangeIterator(new MapFileIterator(options.getTableConfiguration(),
options.getFileSystem(), options.getFilename(), options.getConfiguration()));
if (options.isSeekToBeginning()) {
@@ -148,24 +148,24 @@ public class MapFileOperations extends FileOperations {
}
@Override
- protected FileSKVWriter openWriter(OpenWriterOperation options) throws IOException {
+ protected FileSKVWriter openWriter(FileOptions options) throws IOException {
throw new UnsupportedOperationException();
}
@Override
- protected FileSKVIterator openIndex(OpenIndexOperation options) throws IOException {
+ protected FileSKVIterator openIndex(FileOptions options) throws IOException {
return new SequenceFileIterator(MapFileUtil.openIndex(options.getConfiguration(),
options.getFileSystem(), new Path(options.getFilename())), false);
}
@Override
- protected long getFileSize(GetFileSizeOperation options) throws IOException {
+ protected long getFileSize(FileOptions options) throws IOException {
return options.getFileSystem()
.getFileStatus(new Path(options.getFilename() + "/" + MapFile.DATA_FILE_NAME)).getLen();
}
@Override
- protected FileSKVIterator openScanReader(OpenScanReaderOperation options) throws IOException {
+ protected FileSKVIterator openScanReader(FileOptions options) throws IOException {
MapFileIterator mfIter = new MapFileIterator(options.getTableConfiguration(),
options.getFileSystem(), options.getFilename(), options.getConfiguration());
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index fff8243..abe78f0 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -46,7 +46,7 @@ public class RFileOperations extends FileOperations {
private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
- private static RFile.Reader getReader(FileReaderOperation<?> options) throws IOException {
+ private static RFile.Reader getReader(FileOptions options) throws IOException {
CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(options.getFileSystem(),
new Path(options.getFilename()), options.getConfiguration(), options.getFileLenCache(),
options.getDataCache(), options.getIndexCache(), options.getRateLimiter(),
@@ -55,17 +55,17 @@ public class RFileOperations extends FileOperations {
}
@Override
- protected long getFileSize(GetFileSizeOperation options) throws IOException {
+ protected long getFileSize(FileOptions options) throws IOException {
return options.getFileSystem().getFileStatus(new Path(options.getFilename())).getLen();
}
@Override
- protected FileSKVIterator openIndex(OpenIndexOperation options) throws IOException {
+ protected FileSKVIterator openIndex(FileOptions options) throws IOException {
return getReader(options).getIndex();
}
@Override
- protected FileSKVIterator openReader(OpenReaderOperation options) throws IOException {
+ protected FileSKVIterator openReader(FileOptions options) throws IOException {
RFile.Reader reader = getReader(options);
if (options.isSeekToBeginning()) {
@@ -76,14 +76,14 @@ public class RFileOperations extends FileOperations {
}
@Override
- protected FileSKVIterator openScanReader(OpenScanReaderOperation options) throws IOException {
+ protected FileSKVIterator openScanReader(FileOptions options) throws IOException {
RFile.Reader reader = getReader(options);
reader.seek(options.getRange(), options.getColumnFamilies(), options.isRangeInclusive());
return reader;
}
@Override
- protected FileSKVWriter openWriter(OpenWriterOperation options) throws IOException {
+ protected FileSKVWriter openWriter(FileOptions options) throws IOException {
AccumuloConfiguration acuconf = options.getTableConfiguration();
diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
index 30f9df6..b1d0d37 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
@@ -809,8 +809,8 @@ public class RFileTest {
}
private Reader getReader(LocalFileSystem localFs, String testFile) throws IOException {
- return (Reader) FileOperations.getInstance().newReaderBuilder().forFile(testFile)
- .inFileSystem(localFs, localFs.getConf())
+ return (Reader) FileOperations.getInstance().newReaderBuilder()
+ .forFile(testFile, localFs, localFs.getConf())
.withTableConfiguration(DefaultConfiguration.getInstance()).build();
}