You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/02/11 16:47:14 UTC

[2/4] cassandra git commit: Cleanup SegmentedFile API

Cleanup SegmentedFile API

patch by benedict; reviewed by marcus for CASSANDRA-8749


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9d4909ee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9d4909ee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9d4909ee

Branch: refs/heads/trunk
Commit: 9d4909ee362be5082a6677bdbfba9d2b29848bfd
Parents: 207751c
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Feb 11 15:45:18 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Feb 11 15:45:18 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedRandomAccessReader.java  | 15 ++++---------
 .../cassandra/io/sstable/SSTableReader.java     | 13 +++---------
 .../io/util/BufferedPoolingSegmentedFile.java   |  5 -----
 .../io/util/CompressedPoolingSegmentedFile.java | 16 +++++++++++++-
 .../io/util/CompressedSegmentedFile.java        | 14 +++++++++----
 .../cassandra/io/util/MappedFileDataInput.java  |  9 --------
 .../cassandra/io/util/MmappedSegmentedFile.java |  5 +++--
 .../cassandra/io/util/PoolingSegmentedFile.java |  9 ++++++--
 .../apache/cassandra/io/util/SegmentedFile.java | 22 ++++++++++++++++++--
 10 files changed, 63 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d4909ee/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7352068..3f04637 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.4
+ * Cleanup SegmentedFile API (CASSANDRA-8749)
  * Avoid overlap with early compaction replacement (CASSANDRA-8683)
  * Safer Resource Management++ (CASSANDRA-8707)
  * Write partition size estimates into a system table (CASSANDRA-7688)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d4909ee/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 4521c19..49dcd3d 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -37,6 +37,10 @@ import org.apache.cassandra.utils.FBUtilities;
  */
 public class CompressedRandomAccessReader extends RandomAccessReader
 {
+    public static CompressedRandomAccessReader open(String dataFilePath, CompressionMetadata metadata)
+    {
+        return open(dataFilePath, metadata, null);
+    }
     public static CompressedRandomAccessReader open(String path, CompressionMetadata metadata, CompressedPoolingSegmentedFile owner)
     {
         try
@@ -49,17 +53,6 @@ public class CompressedRandomAccessReader extends RandomAccessReader
         }
     }
 
-    public static CompressedRandomAccessReader open(String dataFilePath, CompressionMetadata metadata)
-    {
-        try
-        {
-            return new CompressedRandomAccessReader(dataFilePath, metadata, null);
-        }
-        catch (FileNotFoundException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
 
     private final CompressionMetadata metadata;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d4909ee/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index a588bff..f59e632 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -77,8 +77,6 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
-import org.apache.cassandra.io.compress.CompressedThrottledReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
 import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
@@ -93,7 +91,6 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.ICompressedFile;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.io.util.SegmentedFile;
-import org.apache.cassandra.io.util.ThrottledReader;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -1852,21 +1849,17 @@ public class SSTableReader extends SSTable implements RefCounted<SSTableReader>
     public RandomAccessReader openDataReader(RateLimiter limiter)
     {
         assert limiter != null;
-        return compression
-               ? CompressedThrottledReader.open(getFilename(), getCompressionMetadata(), limiter)
-               : ThrottledReader.open(new File(getFilename()), limiter);
+        return dfile.createThrottledReader(limiter);
     }
 
     public RandomAccessReader openDataReader()
     {
-        return compression
-               ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata())
-               : RandomAccessReader.open(new File(getFilename()));
+        return dfile.createReader();
     }
 
     public RandomAccessReader openIndexReader()
     {
-        return RandomAccessReader.open(new File(getIndexFilename()));
+        return ifile.createReader();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d4909ee/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index 8334965..e4c363a 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@ -51,9 +51,4 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
             return new BufferedPoolingSegmentedFile(path, length);
         }
     }
-
-    protected RandomAccessReader createReader(String path)
-    {
-        return RandomAccessReader.open(new File(path), this);
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d4909ee/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index 94d23bf..c514b80 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -17,8 +17,11 @@
 */
 package org.apache.cassandra.io.util;
 
+import com.google.common.util.concurrent.RateLimiter;
+
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.compress.CompressedThrottledReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 
@@ -70,7 +73,18 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
             return new CompressedPoolingSegmentedFile(path, metadata(path, finishType));
         }
     }
-    protected RandomAccessReader createReader(String path)
+
+    public RandomAccessReader createReader()
+    {
+        return CompressedRandomAccessReader.open(path, metadata, null);
+    }
+
+    public RandomAccessReader createThrottledReader(RateLimiter limiter)
+    {
+        return CompressedThrottledReader.open(path, metadata, limiter);
+    }
+
+    protected RandomAccessReader createPooledReader()
     {
         return CompressedRandomAccessReader.open(path, metadata, this);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d4909ee/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 0c20bb9..6b5c2e1 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -17,8 +17,11 @@
  */
 package org.apache.cassandra.io.util;
 
+import com.google.common.util.concurrent.RateLimiter;
+
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.compress.CompressedThrottledReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 
@@ -84,11 +87,14 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
         }
     }
 
-    public FileDataInput getSegment(long position)
+    public RandomAccessReader createReader()
+    {
+        return CompressedRandomAccessReader.open(path, metadata);
+    }
+
+    public RandomAccessReader createThrottledReader(RateLimiter limiter)
     {
-        RandomAccessReader reader = CompressedRandomAccessReader.open(path, metadata, null);
-        reader.seek(position);
-        return reader;
+        return CompressedThrottledReader.open(path, metadata, limiter);
     }
 
     public CompressionMetadata getMetadata()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d4909ee/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
index fb5b62e..d056240 100644
--- a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
@@ -31,15 +31,6 @@ public class MappedFileDataInput extends AbstractDataInput implements FileDataIn
     private final long segmentOffset;
     private int position;
 
-    public MappedFileDataInput(FileInputStream stream, String filename, long segmentOffset, int position) throws IOException
-    {
-        FileChannel channel = stream.getChannel();
-        buffer = channel.map(FileChannel.MapMode.READ_ONLY, position, channel.size());
-        this.filename = filename;
-        this.segmentOffset = segmentOffset;
-        this.position = position;
-    }
-
     public MappedFileDataInput(MappedByteBuffer buffer, String filename, long segmentOffset, int position)
     {
         assert buffer != null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d4909ee/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 8b4ae9d..8067c68 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -89,7 +89,8 @@ public class MmappedSegmentedFile extends SegmentedFile
             return new MappedFileDataInput(segment.right, path, segment.left, (int) (position - segment.left));
         }
 
-        // not mmap'd: open a braf covering the segment
+        // we can have single cells or partitions larger than 2Gb, which is our maximum addressable range in a single segment;
+        // in this case we open as a normal random access reader
         // FIXME: brafs are unbounded, so this segment will cover the rest of the file, rather than just the row
         RandomAccessReader file = RandomAccessReader.open(new File(path));
         file.seek(position);
@@ -152,7 +153,7 @@ public class MmappedSegmentedFile extends SegmentedFile
         public Builder()
         {
             super();
-            boundaries = new ArrayList<Long>();
+            boundaries = new ArrayList<>();
             boundaries.add(0L);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d4909ee/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
index daca22f..d3c90c7 100644
--- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.io.util;
 
+import java.io.File;
+
 import org.apache.cassandra.service.FileCacheService;
 
 public abstract class PoolingSegmentedFile extends SegmentedFile
@@ -57,13 +59,16 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
         RandomAccessReader reader = FileCacheService.instance.get(cacheKey);
 
         if (reader == null)
-            reader = createReader(path);
+            reader = createPooledReader();
 
         reader.seek(position);
         return reader;
     }
 
-    protected abstract RandomAccessReader createReader(String path);
+    protected RandomAccessReader createPooledReader()
+    {
+        return RandomAccessReader.open(new File(path), this);
+    }
 
     public void recycle(RandomAccessReader reader)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d4909ee/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index d557b72..510ed81 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -25,6 +25,8 @@ import java.nio.MappedByteBuffer;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
+import com.google.common.util.concurrent.RateLimiter;
+
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
@@ -92,6 +94,24 @@ public abstract class SegmentedFile extends SharedCloseableImpl
 
     public abstract SegmentedFile sharedCopy();
 
+    public RandomAccessReader createReader()
+    {
+        return RandomAccessReader.open(new File(path));
+    }
+
+    public RandomAccessReader createThrottledReader(RateLimiter limiter)
+    {
+        assert limiter != null;
+        return ThrottledReader.open(new File(path), limiter);
+    }
+
+    public FileDataInput getSegment(long position)
+    {
+        RandomAccessReader reader = createReader();
+        reader.seek(position);
+        return reader;
+    }
+
     /**
      * @return A SegmentedFile.Builder.
      */
@@ -112,8 +132,6 @@ public abstract class SegmentedFile extends SharedCloseableImpl
         return new CompressedPoolingSegmentedFile.Builder(writer);
     }
 
-    public abstract FileDataInput getSegment(long position);
-
     /**
      * @return An Iterator over segments, beginning with the segment containing the given position: each segment must be closed after use.
      */