You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2018/05/18 21:14:37 UTC
[accumulo] branch master updated: Inline inner Rfile classes and
interfaces (#487)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 40cbc60 Inline inner Rfile classes and interfaces (#487)
40cbc60 is described below
commit 40cbc60ed7fcb527b1fa57d935baa25d8d47eae4
Author: Mike Miller <mm...@apache.org>
AuthorDate: Fri May 18 17:14:32 2018 -0400
Inline inner Rfile classes and interfaces (#487)
* Eliminated CachableBlockFile.Writer by inlining BCFile.Writer
* Made BCFile close its OutputStream
* Eliminated PositionedOuputs wrapping and PositionedOuput interface
* Made RateLimitedOutputStream extend DataOutputStream and take FSDataOutputStream
---
.../file/blockfile/impl/CachableBlockFile.java | 63 ---------------
.../accumulo/core/file/rfile/MultiLevelIndex.java | 4 +-
.../org/apache/accumulo/core/file/rfile/RFile.java | 20 ++---
.../accumulo/core/file/rfile/RFileOperations.java | 7 +-
.../accumulo/core/file/rfile/SplitLarge.java | 5 +-
.../accumulo/core/file/rfile/bcfile/BCFile.java | 93 ++++++++--------------
.../file/streams/PositionedDataOutputStream.java | 37 ---------
.../core/file/streams/PositionedOutput.java | 27 -------
.../core/file/streams/PositionedOutputs.java | 73 -----------------
.../core/file/streams/RateLimitedOutputStream.java | 14 ++--
.../core/file/rfile/CreateCompatTestFile.java | 6 +-
.../core/file/rfile/MultiLevelIndexTest.java | 5 +-
.../core/file/rfile/MultiThreadedRFileTest.java | 5 +-
.../apache/accumulo/core/file/rfile/RFileTest.java | 5 +-
.../file/streams/RateLimitedOutputStreamTest.java | 13 +--
15 files changed, 70 insertions(+), 307 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 43e9730..de3d88d 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -20,7 +20,6 @@ import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.Map;
@@ -35,9 +34,7 @@ import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
-import org.apache.accumulo.core.file.streams.PositionedOutput;
import org.apache.accumulo.core.file.streams.RateLimitedInputStream;
-import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -58,57 +55,6 @@ public class CachableBlockFile {
private static final Logger log = LoggerFactory.getLogger(CachableBlockFile.class);
- public static class Writer implements Closeable {
- private BCFile.Writer _bc;
- private BCFile.Writer.BlockAppender _bw;
- private final PositionedOutput fsout;
- private long length = 0;
-
- public Writer(FileSystem fs, Path fName, String compressAlgor, RateLimiter writeLimiter,
- Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
- this(new RateLimitedOutputStream(fs.create(fName), writeLimiter), compressAlgor, conf,
- accumuloConfiguration);
- }
-
- public <OutputStreamType extends OutputStream & PositionedOutput> Writer(OutputStreamType fsout,
- String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration)
- throws IOException {
- this.fsout = fsout;
- init(fsout, compressAlgor, conf, accumuloConfiguration);
- }
-
- private <OutputStreamT extends OutputStream & PositionedOutput> void init(OutputStreamT fsout,
- String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration)
- throws IOException {
- _bc = new BCFile.Writer(fsout, compressAlgor, conf, false, accumuloConfiguration);
- }
-
- public BCFile.Writer.BlockAppender prepareMetaBlock(String name) throws IOException {
- _bw = _bc.prepareMetaBlock(name);
- return _bw;
- }
-
- public BCFile.Writer.BlockAppender prepareDataBlock() throws IOException {
- _bw = _bc.prepareDataBlock();
- return _bw;
- }
-
- @Override
- public void close() throws IOException {
-
- _bw.close();
- _bc.close();
-
- length = this.fsout.position();
- ((OutputStream) this.fsout).close();
- }
-
- public long getLength() throws IOException {
- return length;
- }
-
- }
-
private static interface IoeSupplier<T> {
T get() throws IOException;
}
@@ -482,15 +428,6 @@ public class CachableBlockFile {
indexable = true;
}
- /**
- * It is intended that the caller of this method will close the stream we also only intend that
- * this be called once per BlockRead. This method is provide for methods up stream that expect
- * to receive a DataInputStream object.
- */
- public DataInputStream getStream() {
- return this;
- }
-
public void seek(int position) {
seekableInput.seek(position);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
index 992a3ec..a71c79e 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
@@ -507,9 +507,9 @@ public class MultiLevelIndex {
private boolean addedLast = false;
- private CachableBlockFile.Writer blockFileWriter;
+ private BCFile.Writer blockFileWriter;
- Writer(CachableBlockFile.Writer blockFileWriter, int maxBlockSize) {
+ Writer(BCFile.Writer blockFileWriter, int maxBlockSize) {
this.blockFileWriter = blockFileWriter;
this.threshold = maxBlockSize;
levels = new ArrayList<>();
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 7247b60..0c85066 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -56,6 +56,7 @@ import org.apache.accumulo.core.file.rfile.BlockIndex.BlockIndexEntry;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
import org.apache.accumulo.core.file.rfile.RelativeKey.SkippR;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender;
import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
import org.apache.accumulo.core.iterators.IterationInterruptedException;
@@ -163,8 +164,7 @@ public class RFile {
this.version = version;
}
- public LocalityGroupMetadata(Set<ByteSequence> pcf, int indexBlockSize,
- CachableBlockFile.Writer bfw) {
+ public LocalityGroupMetadata(Set<ByteSequence> pcf, int indexBlockSize, BCFile.Writer bfw) {
isDefaultLG = true;
columnFamilies = new HashMap<>();
previousColumnFamilies = pcf;
@@ -174,7 +174,7 @@ public class RFile {
}
public LocalityGroupMetadata(String name, Set<ByteSequence> cfset, int indexBlockSize,
- CachableBlockFile.Writer bfw) {
+ BCFile.Writer bfw) {
this.name = name;
isDefaultLG = false;
columnFamilies = new HashMap<>();
@@ -422,7 +422,7 @@ public class RFile {
private static class LocalityGroupWriter {
- private CachableBlockFile.Writer fileWriter;
+ private BCFile.Writer fileWriter;
private BlockAppender blockWriter;
private final long blockSize;
@@ -441,7 +441,7 @@ public class RFile {
private RollingStats keyLenStats = new RollingStats(2017);
private double averageKeySize = 0;
- LocalityGroupWriter(CachableBlockFile.Writer fileWriter, long blockSize, long maxBlockSize,
+ LocalityGroupWriter(BCFile.Writer fileWriter, long blockSize, long maxBlockSize,
LocalityGroupMetadata currentLocalityGroup, SampleLocalityGroupWriter sample) {
this.fileWriter = fileWriter;
this.blockSize = blockSize;
@@ -552,7 +552,7 @@ public class RFile {
public static final int MAX_CF_IN_DLG = 1000;
private static final double MAX_BLOCK_MULTIPLIER = 1.1;
- private CachableBlockFile.Writer fileWriter;
+ private BCFile.Writer fileWriter;
private final long blockSize;
private final long maxBlockSize;
@@ -575,12 +575,12 @@ public class RFile {
private SamplerConfigurationImpl samplerConfig;
private Sampler sampler;
- public Writer(CachableBlockFile.Writer bfw, int blockSize) throws IOException {
+ public Writer(BCFile.Writer bfw, int blockSize) throws IOException {
this(bfw, blockSize, (int) DefaultConfiguration.getInstance()
.getAsBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX), null, null);
}
- public Writer(CachableBlockFile.Writer bfw, int blockSize, int indexBlockSize,
+ public Writer(BCFile.Writer bfw, int blockSize, int indexBlockSize,
SamplerConfigurationImpl samplerConfig, Sampler sampler) throws IOException {
this.blockSize = blockSize;
this.maxBlockSize = (long) (blockSize * MAX_BLOCK_MULTIPLIER);
@@ -662,7 +662,7 @@ public class RFile {
public DataOutputStream createMetaStore(String name) throws IOException {
closeData();
- return (DataOutputStream) fileWriter.prepareMetaBlock(name);
+ return fileWriter.prepareMetaBlock(name);
}
private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies)
@@ -1340,7 +1340,7 @@ public class RFile {
@Override
public DataInputStream getMetaStore(String name) throws IOException, NoSuchMetaStoreException {
try {
- return this.reader.getMetaBlock(name).getStream();
+ return this.reader.getMetaBlock(name);
} catch (MetaBlockDoesNotExist e) {
throw new NoSuchMetaStoreException("name = " + name, e);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 0054db2..5d8705e 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -30,7 +30,7 @@ import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
-import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.sample.impl.SamplerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -128,9 +128,8 @@ public class RFileOperations extends FileOperations {
outputStream = fs.create(new Path(file), false, bufferSize, (short) rep, block);
}
- CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(
- new RateLimitedOutputStream(outputStream, options.getRateLimiter()), compression, conf,
- acuconf);
+ BCFile.Writer _cbw = new BCFile.Writer(outputStream, options.getRateLimiter(), compression,
+ conf, acuconf);
return new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
index c3663cd..48f2873 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
import org.apache.accumulo.core.file.rfile.RFile.Reader;
import org.apache.accumulo.core.file.rfile.RFile.Writer;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -71,10 +72,10 @@ public class SplitLarge {
int blockSize = (int) aconf.getAsBytes(Property.TABLE_FILE_BLOCK_SIZE);
try (
Writer small = new RFile.Writer(
- new CachableBlockFile.Writer(fs, new Path(smallName), "gz", null, conf, aconf),
+ new BCFile.Writer(fs.create(new Path(smallName)), null, "gz", conf, aconf),
blockSize);
Writer large = new RFile.Writer(
- new CachableBlockFile.Writer(fs, new Path(largeName), "gz", null, conf, aconf),
+ new BCFile.Writer(fs.create(new Path(largeName)), null, "gz", conf, aconf),
blockSize)) {
small.startDefaultLocalityGroup();
large.startDefaultLocalityGroup();
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index f367ed0..9abbef6 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -42,16 +42,17 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
import org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream;
-import org.apache.accumulo.core.file.streams.PositionedDataOutputStream;
-import org.apache.accumulo.core.file.streams.PositionedOutput;
+import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
import org.apache.accumulo.core.security.crypto.CryptoModule;
import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
import org.apache.accumulo.core.security.crypto.SecretKeyEncryptionStrategy;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.compress.Compressor;
@@ -91,7 +92,7 @@ public final class BCFile {
* BCFile writer, the entry point for creating a new BCFile.
*/
static public class Writer implements Closeable {
- private final PositionedDataOutputStream out;
+ private final RateLimitedOutputStream out;
private final Configuration conf;
private final CryptoModule cryptoModule;
private BCFileCryptoModuleParameters cryptoParams;
@@ -106,23 +107,10 @@ public final class BCFile {
long errorCount = 0;
// reusable buffers.
private BytesWritable fsOutputBuffer;
+ private long length = 0;
- /**
- * Call-back interface to register a block after a block is closed.
- */
- private interface BlockRegister {
- /**
- * Register a block that is fully closed.
- *
- * @param raw
- * The size of block in terms of uncompressed bytes.
- * @param offsetStart
- * The start offset of the block.
- * @param offsetEnd
- * One byte after the end of the block. Compressed block size is offsetEnd -
- * offsetStart.
- */
- void register(long raw, long offsetStart, long offsetEnd);
+ public long getLength() {
+ return this.length;
}
/**
@@ -132,7 +120,7 @@ public final class BCFile {
private final Algorithm compressAlgo;
private Compressor compressor; // !null only if using native
// Hadoop compression
- private final PositionedDataOutputStream fsOut;
+ private final RateLimitedOutputStream fsOut;
private final OutputStream cipherOut;
private final long posStart;
private final SimpleBufferedOutputStream fsBufferedOutput;
@@ -144,7 +132,7 @@ public final class BCFile {
* @param cryptoModule
* the module to use to obtain cryptographic streams
*/
- public WBlockState(Algorithm compressionAlgo, PositionedDataOutputStream fsOut,
+ public WBlockState(Algorithm compressionAlgo, RateLimitedOutputStream fsOut,
BytesWritable fsOutputBuffer, Configuration conf, CryptoModule cryptoModule,
CryptoModuleParameters cryptoParams) throws IOException {
this.compressAlgo = compressionAlgo;
@@ -269,21 +257,27 @@ public final class BCFile {
*
*/
public class BlockAppender extends DataOutputStream {
- private final BlockRegister blockRegister;
+ private final MetaBlockRegister metaBlockRegister;
private final WBlockState wBlkState;
private boolean closed = false;
/**
* Constructor
*
- * @param register
+ * @param metaBlockRegister
* the block register, which is called when the block is closed.
* @param wbs
* The writable compression block state.
*/
- BlockAppender(BlockRegister register, WBlockState wbs) {
+ BlockAppender(MetaBlockRegister metaBlockRegister, WBlockState wbs) {
super(wbs.getOutputStream());
- this.blockRegister = register;
+ this.metaBlockRegister = metaBlockRegister;
+ this.wBlkState = wbs;
+ }
+
+ BlockAppender(WBlockState wbs) {
+ super(wbs.getOutputStream());
+ this.metaBlockRegister = null;
this.wBlkState = wbs;
}
@@ -334,7 +328,9 @@ public final class BCFile {
try {
++errorCount;
wBlkState.finish();
- blockRegister.register(getRawSize(), wBlkState.getStartPos(), wBlkState.getCurrentPos());
+ if (metaBlockRegister != null)
+ metaBlockRegister.register(getRawSize(), wBlkState.getStartPos(),
+ wBlkState.getCurrentPos());
--errorCount;
} finally {
closed = true;
@@ -352,16 +348,15 @@ public final class BCFile {
* Name of the compression algorithm, which will be used for all data blocks.
* @see Compression#getSupportedAlgorithms
*/
- public <OutputStreamType extends OutputStream & PositionedOutput> Writer(OutputStreamType fout,
- String compressionName, Configuration conf, boolean trackDataBlocks,
- AccumuloConfiguration accumuloConfiguration) throws IOException {
- if (fout.position() != 0) {
+ public Writer(FSDataOutputStream fout, RateLimiter writeLimiter, String compressionName,
+ Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
+ if (fout.getPos() != 0) {
throw new IOException("Output file not at zero offset.");
}
- this.out = new PositionedDataOutputStream(fout);
+ this.out = new RateLimitedOutputStream(fout, writeLimiter);
this.conf = conf;
- dataIndex = new DataIndex(compressionName, trackDataBlocks);
+ dataIndex = new DataIndex(compressionName);
metaIndex = new MetaIndex();
fsOutputBuffer = new BytesWritable();
Magic.write(this.out);
@@ -424,6 +419,8 @@ public final class BCFile {
Magic.write(out);
out.flush();
+ length = out.position();
+ out.close();
}
} finally {
closed = true;
@@ -485,11 +482,9 @@ public final class BCFile {
throw new IllegalStateException("Cannot create Data Block after Meta Blocks.");
}
- DataBlockRegister dbr = new DataBlockRegister();
-
WBlockState wbs = new WBlockState(getDefaultCompressionAlgorithm(), out, fsOutputBuffer, conf,
cryptoModule, cryptoParams);
- BlockAppender ba = new BlockAppender(dbr, wbs);
+ BlockAppender ba = new BlockAppender(wbs);
blkInProgress = true;
return ba;
}
@@ -497,7 +492,7 @@ public final class BCFile {
/**
* Callback to make sure a meta block is added to the internal list when its stream is closed.
*/
- private class MetaBlockRegister implements BlockRegister {
+ private class MetaBlockRegister {
private final String name;
private final Algorithm compressAlgo;
@@ -506,27 +501,11 @@ public final class BCFile {
this.compressAlgo = compressAlgo;
}
- @Override
public void register(long raw, long begin, long end) {
metaIndex.addEntry(
new MetaIndexEntry(name, compressAlgo, new BlockRegion(begin, end - begin, raw)));
}
}
-
- /**
- * Callback to make sure a data block is added to the internal list when it's being closed.
- *
- */
- private class DataBlockRegister implements BlockRegister {
- DataBlockRegister() {
- // do nothing
- }
-
- @Override
- public void register(long raw, long begin, long end) {
- dataIndex.addBlockRegion(new BlockRegion(begin, end - begin, raw));
- }
- }
}
// sha256 of some random data
@@ -1083,8 +1062,6 @@ public final class BCFile {
// and raw size
private final ArrayList<BlockRegion> listRegions;
- private boolean trackBlocks;
-
// for read, deserialized from a file
public DataIndex(DataInput in) throws IOException {
defaultCompressionAlgorithm = Compression.getCompressionAlgorithmByName(Utils.readString(in));
@@ -1099,8 +1076,7 @@ public final class BCFile {
}
// for write
- public DataIndex(String defaultCompressionAlgorithmName, boolean trackBlocks) {
- this.trackBlocks = trackBlocks;
+ public DataIndex(String defaultCompressionAlgorithmName) {
this.defaultCompressionAlgorithm = Compression
.getCompressionAlgorithmByName(defaultCompressionAlgorithmName);
listRegions = new ArrayList<>();
@@ -1114,11 +1090,6 @@ public final class BCFile {
return listRegions;
}
- public void addBlockRegion(BlockRegion region) {
- if (trackBlocks)
- listRegions.add(region);
- }
-
public void write(DataOutput out) throws IOException {
Utils.writeString(out, defaultCompressionAlgorithm.getName());
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
deleted file mode 100644
index 419e6b3..0000000
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.streams;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * A filter converting a {@link PositionedOutput} {@code OutputStream} to a {@code PositionedOutput}
- * {@code DataOutputStream}
- */
-public class PositionedDataOutputStream extends DataOutputStream implements PositionedOutput {
- public <StreamType extends OutputStream & PositionedOutput> PositionedDataOutputStream(
- StreamType type) {
- super(type);
- }
-
- @Override
- public long position() throws IOException {
- return ((PositionedOutput) out).position();
- }
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
deleted file mode 100644
index aa3122d..0000000
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.streams;
-
-import java.io.IOException;
-
-/**
- * For any byte sink (but especially OutputStream), the ability to report how many bytes have been
- * sunk.
- */
-public interface PositionedOutput {
- long position() throws IOException;
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
deleted file mode 100644
index 403c1a2..0000000
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.streams;
-
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Objects;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-
-/**
- * Utility functions for {@link PositionedOutput}.
- */
-public class PositionedOutputs {
- private PositionedOutputs() {}
-
- /**
- * Convert an {@code OutputStream} into an {@code OutputStream} implementing
- * {@link PositionedOutput}.
- */
- public static PositionedOutputStream wrap(final OutputStream fout) {
- Objects.requireNonNull(fout);
- if (fout instanceof FSDataOutputStream) {
- return new PositionedOutputStream(fout) {
- @Override
- public long position() throws IOException {
- return ((FSDataOutputStream) fout).getPos();
- }
- };
- } else if (fout instanceof PositionedOutput) {
- return new PositionedOutputStream(fout) {
- @Override
- public long position() throws IOException {
- return ((PositionedOutput) fout).position();
- }
- };
- } else {
- return new PositionedOutputStream(fout) {
- @Override
- public long position() throws IOException {
- throw new UnsupportedOperationException("Underlying stream does not support position()");
- }
- };
- }
- }
-
- private static abstract class PositionedOutputStream extends FilterOutputStream
- implements PositionedOutput {
- public PositionedOutputStream(OutputStream stream) {
- super(stream);
- }
-
- @Override
- public void write(byte[] data, int off, int len) throws IOException {
- out.write(data, off, len);
- }
- }
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
index 417b89c..b83b898 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
@@ -16,21 +16,22 @@
*/
package org.apache.accumulo.core.file.streams;
-import java.io.FilterOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.hadoop.fs.FSDataOutputStream;
/**
* A decorator for {@code OutputStream} which limits the rate at which data may be written.
+ * Underlying OutputStream is a FSDataOutputStream.
*/
-public class RateLimitedOutputStream extends FilterOutputStream implements PositionedOutput {
+public class RateLimitedOutputStream extends DataOutputStream {
private final RateLimiter writeLimiter;
- public RateLimitedOutputStream(OutputStream wrappedStream, RateLimiter writeLimiter) {
- super(PositionedOutputs.wrap(wrappedStream));
+ public RateLimitedOutputStream(FSDataOutputStream fsDataOutputStream, RateLimiter writeLimiter) {
+ super(fsDataOutputStream);
this.writeLimiter = writeLimiter == null ? NullRateLimiter.INSTANCE : writeLimiter;
}
@@ -51,8 +52,7 @@ public class RateLimitedOutputStream extends FilterOutputStream implements Posit
out.close();
}
- @Override
public long position() throws IOException {
- return ((PositionedOutput) out).position();
+ return ((FSDataOutputStream) out).getPos();
}
}
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
index 8e09355..086e8b9 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
@@ -24,7 +24,7 @@ import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -56,8 +56,8 @@ public class CreateCompatTestFile {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
- CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs, new Path(args[0]), "gz", null,
- conf, DefaultConfiguration.getInstance());
+ BCFile.Writer _cbw = new BCFile.Writer(fs.create(new Path(args[0])), null, "gz", conf,
+ DefaultConfiguration.getInstance());
RFile.Writer writer = new RFile.Writer(_cbw, 1000);
writer.startNewLocalityGroup("lg1",
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index 656e7da..dd316a4 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -31,7 +31,6 @@ import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Writer;
import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
-import org.apache.accumulo.core.file.streams.PositionedOutputs;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -56,8 +55,8 @@ public class MultiLevelIndexTest extends TestCase {
AccumuloConfiguration aconf = DefaultConfiguration.getInstance();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
- CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos), "gz",
- CachedConfiguration.getInstance(), aconf);
+ BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", CachedConfiguration.getInstance(),
+ aconf);
BufferedWriter mliw = new BufferedWriter(new Writer(_cbw, maxBlockSize));
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
index ee7b02b..ca19a1f 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
@@ -46,7 +46,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
import org.apache.accumulo.core.file.rfile.RFile.Reader;
-import org.apache.accumulo.core.file.streams.PositionedOutputs;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
@@ -147,8 +147,7 @@ public class MultiThreadedRFileTest {
FileSystem fs = FileSystem.newInstance(conf);
Path path = new Path("file://" + rfile);
dos = fs.create(path, true);
- CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos),
- "gz", conf, accumuloConfiguration);
+ BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, accumuloConfiguration);
SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl
.newSamplerConfig(accumuloConfiguration);
Sampler sampler = null;
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index f968b21..e23fbd7 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -66,7 +66,7 @@ import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache;
import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
import org.apache.accumulo.core.file.rfile.RFile.Reader;
-import org.apache.accumulo.core.file.streams.PositionedOutputs;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
import org.apache.accumulo.core.metadata.MetadataTable;
@@ -230,8 +230,7 @@ public class RFileTest {
public void openWriter(boolean startDLG, int blockSize) throws IOException {
baos = new ByteArrayOutputStream();
dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
- CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos),
- "gz", conf, accumuloConfiguration);
+ BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, accumuloConfiguration);
SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl
.newSamplerConfig(accumuloConfiguration);
diff --git a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
index 18d229b..fbcc164 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
@@ -16,10 +16,10 @@
*/
package org.apache.accumulo.core.file.streams;
-import java.io.FilterOutputStream;
import java.io.IOException;
import java.util.Random;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.junit.Assert;
import org.junit.Test;
@@ -45,14 +45,9 @@ public class RateLimitedOutputStreamTest {
Assert.assertEquals(bytesWritten, rateLimiter.getPermitsAcquired());
}
- public static class NullOutputStream extends FilterOutputStream implements PositionedOutput {
- public NullOutputStream() {
- super(new CountingOutputStream(ByteStreams.nullOutputStream()));
- }
-
- @Override
- public long position() throws IOException {
- return ((CountingOutputStream) out).getCount();
+ public static class NullOutputStream extends FSDataOutputStream {
+ public NullOutputStream() throws IOException {
+ super(new CountingOutputStream(ByteStreams.nullOutputStream()), null);
}
}
--
To stop receiving notification emails like this one, please contact
mmiller@apache.org.