You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2018/05/18 21:14:34 UTC

[GitHub] milleruntime closed pull request #487: Inline inner Rfile classes and interfaces

milleruntime closed pull request #487: Inline inner Rfile classes and interfaces
URL: https://github.com/apache/accumulo/pull/487
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 43e97304f5..de3d88da90 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -20,7 +20,6 @@
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.UncheckedIOException;
 import java.util.Collections;
 import java.util.Map;
@@ -35,9 +34,7 @@
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
 import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
-import org.apache.accumulo.core.file.streams.PositionedOutput;
 import org.apache.accumulo.core.file.streams.RateLimitedInputStream;
-import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -58,57 +55,6 @@ private CachableBlockFile() {}
 
   private static final Logger log = LoggerFactory.getLogger(CachableBlockFile.class);
 
-  public static class Writer implements Closeable {
-    private BCFile.Writer _bc;
-    private BCFile.Writer.BlockAppender _bw;
-    private final PositionedOutput fsout;
-    private long length = 0;
-
-    public Writer(FileSystem fs, Path fName, String compressAlgor, RateLimiter writeLimiter,
-        Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
-      this(new RateLimitedOutputStream(fs.create(fName), writeLimiter), compressAlgor, conf,
-          accumuloConfiguration);
-    }
-
-    public <OutputStreamType extends OutputStream & PositionedOutput> Writer(OutputStreamType fsout,
-        String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration)
-        throws IOException {
-      this.fsout = fsout;
-      init(fsout, compressAlgor, conf, accumuloConfiguration);
-    }
-
-    private <OutputStreamT extends OutputStream & PositionedOutput> void init(OutputStreamT fsout,
-        String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration)
-        throws IOException {
-      _bc = new BCFile.Writer(fsout, compressAlgor, conf, false, accumuloConfiguration);
-    }
-
-    public BCFile.Writer.BlockAppender prepareMetaBlock(String name) throws IOException {
-      _bw = _bc.prepareMetaBlock(name);
-      return _bw;
-    }
-
-    public BCFile.Writer.BlockAppender prepareDataBlock() throws IOException {
-      _bw = _bc.prepareDataBlock();
-      return _bw;
-    }
-
-    @Override
-    public void close() throws IOException {
-
-      _bw.close();
-      _bc.close();
-
-      length = this.fsout.position();
-      ((OutputStream) this.fsout).close();
-    }
-
-    public long getLength() throws IOException {
-      return length;
-    }
-
-  }
-
   private static interface IoeSupplier<T> {
     T get() throws IOException;
   }
@@ -482,15 +428,6 @@ private CachedBlockRead(SeekableByteArrayInputStream seekableInput, CacheEntry c
       indexable = true;
     }
 
-    /**
-     * It is intended that the caller of this method will close the stream we also only intend that
-     * this be called once per BlockRead. This method is provide for methods up stream that expect
-     * to receive a DataInputStream object.
-     */
-    public DataInputStream getStream() {
-      return this;
-    }
-
     public void seek(int position) {
       seekableInput.seek(position);
     }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
index 992a3ece3a..a71c79e2cf 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
@@ -507,9 +507,9 @@ public void close(DataOutput out) throws IOException {
 
     private boolean addedLast = false;
 
-    private CachableBlockFile.Writer blockFileWriter;
+    private BCFile.Writer blockFileWriter;
 
-    Writer(CachableBlockFile.Writer blockFileWriter, int maxBlockSize) {
+    Writer(BCFile.Writer blockFileWriter, int maxBlockSize) {
       this.blockFileWriter = blockFileWriter;
       this.threshold = maxBlockSize;
       levels = new ArrayList<>();
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 7247b60a61..0c850663f3 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -56,6 +56,7 @@
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
 import org.apache.accumulo.core.file.rfile.RelativeKey.SkippR;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender;
 import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
@@ -163,8 +164,7 @@ public LocalityGroupMetadata(int version, CachableBlockFile.Reader br) {
       this.version = version;
     }
 
-    public LocalityGroupMetadata(Set<ByteSequence> pcf, int indexBlockSize,
-        CachableBlockFile.Writer bfw) {
+    public LocalityGroupMetadata(Set<ByteSequence> pcf, int indexBlockSize, BCFile.Writer bfw) {
       isDefaultLG = true;
       columnFamilies = new HashMap<>();
       previousColumnFamilies = pcf;
@@ -174,7 +174,7 @@ public LocalityGroupMetadata(Set<ByteSequence> pcf, int indexBlockSize,
     }
 
     public LocalityGroupMetadata(String name, Set<ByteSequence> cfset, int indexBlockSize,
-        CachableBlockFile.Writer bfw) {
+        BCFile.Writer bfw) {
       this.name = name;
       isDefaultLG = false;
       columnFamilies = new HashMap<>();
@@ -422,7 +422,7 @@ public void flushIfNeeded() throws IOException {
 
   private static class LocalityGroupWriter {
 
-    private CachableBlockFile.Writer fileWriter;
+    private BCFile.Writer fileWriter;
     private BlockAppender blockWriter;
 
     private final long blockSize;
@@ -441,7 +441,7 @@ public void flushIfNeeded() throws IOException {
     private RollingStats keyLenStats = new RollingStats(2017);
     private double averageKeySize = 0;
 
-    LocalityGroupWriter(CachableBlockFile.Writer fileWriter, long blockSize, long maxBlockSize,
+    LocalityGroupWriter(BCFile.Writer fileWriter, long blockSize, long maxBlockSize,
         LocalityGroupMetadata currentLocalityGroup, SampleLocalityGroupWriter sample) {
       this.fileWriter = fileWriter;
       this.blockSize = blockSize;
@@ -552,7 +552,7 @@ public void close() throws IOException {
     public static final int MAX_CF_IN_DLG = 1000;
     private static final double MAX_BLOCK_MULTIPLIER = 1.1;
 
-    private CachableBlockFile.Writer fileWriter;
+    private BCFile.Writer fileWriter;
 
     private final long blockSize;
     private final long maxBlockSize;
@@ -575,12 +575,12 @@ public void close() throws IOException {
     private SamplerConfigurationImpl samplerConfig;
     private Sampler sampler;
 
-    public Writer(CachableBlockFile.Writer bfw, int blockSize) throws IOException {
+    public Writer(BCFile.Writer bfw, int blockSize) throws IOException {
       this(bfw, blockSize, (int) DefaultConfiguration.getInstance()
           .getAsBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX), null, null);
     }
 
-    public Writer(CachableBlockFile.Writer bfw, int blockSize, int indexBlockSize,
+    public Writer(BCFile.Writer bfw, int blockSize, int indexBlockSize,
         SamplerConfigurationImpl samplerConfig, Sampler sampler) throws IOException {
       this.blockSize = blockSize;
       this.maxBlockSize = (long) (blockSize * MAX_BLOCK_MULTIPLIER);
@@ -662,7 +662,7 @@ public void append(Key key, Value value) throws IOException {
     public DataOutputStream createMetaStore(String name) throws IOException {
       closeData();
 
-      return (DataOutputStream) fileWriter.prepareMetaBlock(name);
+      return fileWriter.prepareMetaBlock(name);
     }
 
     private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies)
@@ -1340,7 +1340,7 @@ public Key getLastKey() throws IOException {
     @Override
     public DataInputStream getMetaStore(String name) throws IOException, NoSuchMetaStoreException {
       try {
-        return this.reader.getMetaBlock(name).getStream();
+        return this.reader.getMetaBlock(name);
       } catch (MetaBlockDoesNotExist e) {
         throw new NoSuchMetaStoreException("name = " + name, e);
       }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 0054db2e85..5d8705e686 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -30,7 +30,7 @@
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
-import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.sample.impl.SamplerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -128,9 +128,8 @@ protected FileSKVWriter openWriter(OpenWriterOperation options) throws IOExcepti
       outputStream = fs.create(new Path(file), false, bufferSize, (short) rep, block);
     }
 
-    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(
-        new RateLimitedOutputStream(outputStream, options.getRateLimiter()), compression, conf,
-        acuconf);
+    BCFile.Writer _cbw = new BCFile.Writer(outputStream, options.getRateLimiter(), compression,
+        conf, acuconf);
 
     return new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler);
   }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
index c3663cd72f..48f2873bfb 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
@@ -29,6 +29,7 @@
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
 import org.apache.accumulo.core.file.rfile.RFile.Writer;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -71,10 +72,10 @@ public static void main(String[] args) throws Exception {
         int blockSize = (int) aconf.getAsBytes(Property.TABLE_FILE_BLOCK_SIZE);
         try (
             Writer small = new RFile.Writer(
-                new CachableBlockFile.Writer(fs, new Path(smallName), "gz", null, conf, aconf),
+                new BCFile.Writer(fs.create(new Path(smallName)), null, "gz", conf, aconf),
                 blockSize);
             Writer large = new RFile.Writer(
-                new CachableBlockFile.Writer(fs, new Path(largeName), "gz", null, conf, aconf),
+                new BCFile.Writer(fs.create(new Path(largeName)), null, "gz", conf, aconf),
                 blockSize)) {
           small.startDefaultLocalityGroup();
           large.startDefaultLocalityGroup();
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index f367ed0caa..9abbef6059 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -42,16 +42,17 @@
 import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
 import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
 import org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream;
-import org.apache.accumulo.core.file.streams.PositionedDataOutputStream;
-import org.apache.accumulo.core.file.streams.PositionedOutput;
+import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
 import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
 import org.apache.accumulo.core.security.crypto.CryptoModule;
 import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
 import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
 import org.apache.accumulo.core.security.crypto.SecretKeyEncryptionStrategy;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.compress.Compressor;
@@ -91,7 +92,7 @@ private BCFile() {
    * BCFile writer, the entry point for creating a new BCFile.
    */
   static public class Writer implements Closeable {
-    private final PositionedDataOutputStream out;
+    private final RateLimitedOutputStream out;
     private final Configuration conf;
     private final CryptoModule cryptoModule;
     private BCFileCryptoModuleParameters cryptoParams;
@@ -106,23 +107,10 @@ private BCFile() {
     long errorCount = 0;
     // reusable buffers.
     private BytesWritable fsOutputBuffer;
+    private long length = 0;
 
-    /**
-     * Call-back interface to register a block after a block is closed.
-     */
-    private interface BlockRegister {
-      /**
-       * Register a block that is fully closed.
-       *
-       * @param raw
-       *          The size of block in terms of uncompressed bytes.
-       * @param offsetStart
-       *          The start offset of the block.
-       * @param offsetEnd
-       *          One byte after the end of the block. Compressed block size is offsetEnd -
-       *          offsetStart.
-       */
-      void register(long raw, long offsetStart, long offsetEnd);
+    public long getLength() {
+      return this.length;
     }
 
     /**
@@ -132,7 +120,7 @@ private BCFile() {
       private final Algorithm compressAlgo;
       private Compressor compressor; // !null only if using native
       // Hadoop compression
-      private final PositionedDataOutputStream fsOut;
+      private final RateLimitedOutputStream fsOut;
       private final OutputStream cipherOut;
       private final long posStart;
       private final SimpleBufferedOutputStream fsBufferedOutput;
@@ -144,7 +132,7 @@ private BCFile() {
        * @param cryptoModule
        *          the module to use to obtain cryptographic streams
        */
-      public WBlockState(Algorithm compressionAlgo, PositionedDataOutputStream fsOut,
+      public WBlockState(Algorithm compressionAlgo, RateLimitedOutputStream fsOut,
           BytesWritable fsOutputBuffer, Configuration conf, CryptoModule cryptoModule,
           CryptoModuleParameters cryptoParams) throws IOException {
         this.compressAlgo = compressionAlgo;
@@ -269,21 +257,27 @@ public void finish() throws IOException {
      *
      */
     public class BlockAppender extends DataOutputStream {
-      private final BlockRegister blockRegister;
+      private final MetaBlockRegister metaBlockRegister;
       private final WBlockState wBlkState;
       private boolean closed = false;
 
       /**
        * Constructor
        *
-       * @param register
+       * @param metaBlockRegister
        *          the block register, which is called when the block is closed.
        * @param wbs
        *          The writable compression block state.
        */
-      BlockAppender(BlockRegister register, WBlockState wbs) {
+      BlockAppender(MetaBlockRegister metaBlockRegister, WBlockState wbs) {
         super(wbs.getOutputStream());
-        this.blockRegister = register;
+        this.metaBlockRegister = metaBlockRegister;
+        this.wBlkState = wbs;
+      }
+
+      BlockAppender(WBlockState wbs) {
+        super(wbs.getOutputStream());
+        this.metaBlockRegister = null;
         this.wBlkState = wbs;
       }
 
@@ -334,7 +328,9 @@ public void close() throws IOException {
         try {
           ++errorCount;
           wBlkState.finish();
-          blockRegister.register(getRawSize(), wBlkState.getStartPos(), wBlkState.getCurrentPos());
+          if (metaBlockRegister != null)
+            metaBlockRegister.register(getRawSize(), wBlkState.getStartPos(),
+                wBlkState.getCurrentPos());
           --errorCount;
         } finally {
           closed = true;
@@ -352,16 +348,15 @@ public void close() throws IOException {
      *          Name of the compression algorithm, which will be used for all data blocks.
      * @see Compression#getSupportedAlgorithms
      */
-    public <OutputStreamType extends OutputStream & PositionedOutput> Writer(OutputStreamType fout,
-        String compressionName, Configuration conf, boolean trackDataBlocks,
-        AccumuloConfiguration accumuloConfiguration) throws IOException {
-      if (fout.position() != 0) {
+    public Writer(FSDataOutputStream fout, RateLimiter writeLimiter, String compressionName,
+        Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
+      if (fout.getPos() != 0) {
         throw new IOException("Output file not at zero offset.");
       }
 
-      this.out = new PositionedDataOutputStream(fout);
+      this.out = new RateLimitedOutputStream(fout, writeLimiter);
       this.conf = conf;
-      dataIndex = new DataIndex(compressionName, trackDataBlocks);
+      dataIndex = new DataIndex(compressionName);
       metaIndex = new MetaIndex();
       fsOutputBuffer = new BytesWritable();
       Magic.write(this.out);
@@ -424,6 +419,8 @@ public void close() throws IOException {
 
           Magic.write(out);
           out.flush();
+          length = out.position();
+          out.close();
         }
       } finally {
         closed = true;
@@ -485,11 +482,9 @@ public BlockAppender prepareDataBlock() throws IOException {
         throw new IllegalStateException("Cannot create Data Block after Meta Blocks.");
       }
 
-      DataBlockRegister dbr = new DataBlockRegister();
-
       WBlockState wbs = new WBlockState(getDefaultCompressionAlgorithm(), out, fsOutputBuffer, conf,
           cryptoModule, cryptoParams);
-      BlockAppender ba = new BlockAppender(dbr, wbs);
+      BlockAppender ba = new BlockAppender(wbs);
       blkInProgress = true;
       return ba;
     }
@@ -497,7 +492,7 @@ public BlockAppender prepareDataBlock() throws IOException {
     /**
      * Callback to make sure a meta block is added to the internal list when its stream is closed.
      */
-    private class MetaBlockRegister implements BlockRegister {
+    private class MetaBlockRegister {
       private final String name;
       private final Algorithm compressAlgo;
 
@@ -506,27 +501,11 @@ public BlockAppender prepareDataBlock() throws IOException {
         this.compressAlgo = compressAlgo;
       }
 
-      @Override
       public void register(long raw, long begin, long end) {
         metaIndex.addEntry(
             new MetaIndexEntry(name, compressAlgo, new BlockRegion(begin, end - begin, raw)));
       }
     }
-
-    /**
-     * Callback to make sure a data block is added to the internal list when it's being closed.
-     *
-     */
-    private class DataBlockRegister implements BlockRegister {
-      DataBlockRegister() {
-        // do nothing
-      }
-
-      @Override
-      public void register(long raw, long begin, long end) {
-        dataIndex.addBlockRegion(new BlockRegion(begin, end - begin, raw));
-      }
-    }
   }
 
   // sha256 of some random data
@@ -1083,8 +1062,6 @@ public void write(DataOutput out) throws IOException {
     // and raw size
     private final ArrayList<BlockRegion> listRegions;
 
-    private boolean trackBlocks;
-
     // for read, deserialized from a file
     public DataIndex(DataInput in) throws IOException {
       defaultCompressionAlgorithm = Compression.getCompressionAlgorithmByName(Utils.readString(in));
@@ -1099,8 +1076,7 @@ public DataIndex(DataInput in) throws IOException {
     }
 
     // for write
-    public DataIndex(String defaultCompressionAlgorithmName, boolean trackBlocks) {
-      this.trackBlocks = trackBlocks;
+    public DataIndex(String defaultCompressionAlgorithmName) {
       this.defaultCompressionAlgorithm = Compression
           .getCompressionAlgorithmByName(defaultCompressionAlgorithmName);
       listRegions = new ArrayList<>();
@@ -1114,11 +1090,6 @@ public Algorithm getDefaultCompressionAlgorithm() {
       return listRegions;
     }
 
-    public void addBlockRegion(BlockRegion region) {
-      if (trackBlocks)
-        listRegions.add(region);
-    }
-
     public void write(DataOutput out) throws IOException {
       Utils.writeString(out, defaultCompressionAlgorithm.getName());
 
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
deleted file mode 100644
index 419e6b309e..0000000000
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.streams;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * A filter converting a {@link PositionedOutput} {@code OutputStream} to a {@code PositionedOutput}
- * {@code DataOutputStream}
- */
-public class PositionedDataOutputStream extends DataOutputStream implements PositionedOutput {
-  public <StreamType extends OutputStream & PositionedOutput> PositionedDataOutputStream(
-      StreamType type) {
-    super(type);
-  }
-
-  @Override
-  public long position() throws IOException {
-    return ((PositionedOutput) out).position();
-  }
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
deleted file mode 100644
index aa3122d756..0000000000
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.streams;
-
-import java.io.IOException;
-
-/**
- * For any byte sink (but especially OutputStream), the ability to report how many bytes have been
- * sunk.
- */
-public interface PositionedOutput {
-  long position() throws IOException;
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
deleted file mode 100644
index 403c1a2a34..0000000000
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.streams;
-
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Objects;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-
-/**
- * Utility functions for {@link PositionedOutput}.
- */
-public class PositionedOutputs {
-  private PositionedOutputs() {}
-
-  /**
-   * Convert an {@code OutputStream} into an {@code OutputStream} implementing
-   * {@link PositionedOutput}.
-   */
-  public static PositionedOutputStream wrap(final OutputStream fout) {
-    Objects.requireNonNull(fout);
-    if (fout instanceof FSDataOutputStream) {
-      return new PositionedOutputStream(fout) {
-        @Override
-        public long position() throws IOException {
-          return ((FSDataOutputStream) fout).getPos();
-        }
-      };
-    } else if (fout instanceof PositionedOutput) {
-      return new PositionedOutputStream(fout) {
-        @Override
-        public long position() throws IOException {
-          return ((PositionedOutput) fout).position();
-        }
-      };
-    } else {
-      return new PositionedOutputStream(fout) {
-        @Override
-        public long position() throws IOException {
-          throw new UnsupportedOperationException("Underlying stream does not support position()");
-        }
-      };
-    }
-  }
-
-  private static abstract class PositionedOutputStream extends FilterOutputStream
-      implements PositionedOutput {
-    public PositionedOutputStream(OutputStream stream) {
-      super(stream);
-    }
-
-    @Override
-    public void write(byte[] data, int off, int len) throws IOException {
-      out.write(data, off, len);
-    }
-  }
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
index 417b89c963..b83b8985b7 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
@@ -16,21 +16,22 @@
  */
 package org.apache.accumulo.core.file.streams;
 
-import java.io.FilterOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 
 import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.hadoop.fs.FSDataOutputStream;
 
 /**
  * A decorator for {@code OutputStream} which limits the rate at which data may be written.
+ * Underlying OutputStream is a FSDataOutputStream.
  */
-public class RateLimitedOutputStream extends FilterOutputStream implements PositionedOutput {
+public class RateLimitedOutputStream extends DataOutputStream {
   private final RateLimiter writeLimiter;
 
-  public RateLimitedOutputStream(OutputStream wrappedStream, RateLimiter writeLimiter) {
-    super(PositionedOutputs.wrap(wrappedStream));
+  public RateLimitedOutputStream(FSDataOutputStream fsDataOutputStream, RateLimiter writeLimiter) {
+    super(fsDataOutputStream);
     this.writeLimiter = writeLimiter == null ? NullRateLimiter.INSTANCE : writeLimiter;
   }
 
@@ -51,8 +52,7 @@ public void close() throws IOException {
     out.close();
   }
 
-  @Override
   public long position() throws IOException {
-    return ((PositionedOutput) out).position();
+    return ((FSDataOutputStream) out).getPos();
   }
 }
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
index 8e093557b4..086e8b9931 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
@@ -24,7 +24,7 @@
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -56,8 +56,8 @@ private static String formatStr(String prefix, int i) {
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(conf);
-    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs, new Path(args[0]), "gz", null,
-        conf, DefaultConfiguration.getInstance());
+    BCFile.Writer _cbw = new BCFile.Writer(fs.create(new Path(args[0])), null, "gz", conf,
+        DefaultConfiguration.getInstance());
     RFile.Writer writer = new RFile.Writer(_cbw, 1000);
 
     writer.startNewLocalityGroup("lg1",
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index 656e7daba1..dd316a4492 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -31,7 +31,6 @@
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Writer;
 import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
-import org.apache.accumulo.core.file.streams.PositionedOutputs;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -56,8 +55,8 @@ private void runTest(int maxBlockSize, int num) throws IOException {
     AccumuloConfiguration aconf = DefaultConfiguration.getInstance();
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
-    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos), "gz",
-        CachedConfiguration.getInstance(), aconf);
+    BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", CachedConfiguration.getInstance(),
+        aconf);
 
     BufferedWriter mliw = new BufferedWriter(new Writer(_cbw, maxBlockSize));
 
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
index ee7b02b7d5..ca19a1f2a5 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
@@ -46,7 +46,7 @@
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
-import org.apache.accumulo.core.file.streams.PositionedOutputs;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
@@ -147,8 +147,7 @@ public void openWriter(boolean startDLG) throws IOException {
       FileSystem fs = FileSystem.newInstance(conf);
       Path path = new Path("file://" + rfile);
       dos = fs.create(path, true);
-      CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos),
-          "gz", conf, accumuloConfiguration);
+      BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, accumuloConfiguration);
       SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl
           .newSamplerConfig(accumuloConfiguration);
       Sampler sampler = null;
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index f968b21732..e23fbd753e 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -66,7 +66,7 @@
 import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
-import org.apache.accumulo.core.file.streams.PositionedOutputs;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
 import org.apache.accumulo.core.metadata.MetadataTable;
@@ -230,8 +230,7 @@ public void openWriter(boolean startDLG) throws IOException {
     public void openWriter(boolean startDLG, int blockSize) throws IOException {
       baos = new ByteArrayOutputStream();
       dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
-      CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos),
-          "gz", conf, accumuloConfiguration);
+      BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, accumuloConfiguration);
 
       SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl
           .newSamplerConfig(accumuloConfiguration);
diff --git a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
index 18d229b4bb..fbcc1640cb 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
@@ -16,10 +16,10 @@
  */
 package org.apache.accumulo.core.file.streams;
 
-import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.util.Random;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -45,14 +45,9 @@ public void permitsAreProperlyAcquired() throws Exception {
     Assert.assertEquals(bytesWritten, rateLimiter.getPermitsAcquired());
   }
 
-  public static class NullOutputStream extends FilterOutputStream implements PositionedOutput {
-    public NullOutputStream() {
-      super(new CountingOutputStream(ByteStreams.nullOutputStream()));
-    }
-
-    @Override
-    public long position() throws IOException {
-      return ((CountingOutputStream) out).getCount();
+  public static class NullOutputStream extends FSDataOutputStream {
+    public NullOutputStream() throws IOException {
+      super(new CountingOutputStream(ByteStreams.nullOutputStream()), null);
     }
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services