You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2016/06/21 19:48:49 UTC

cassandra git commit: Implement / integrate FileSegmentInputStream.seek() into CommitLogReader

Repository: cassandra
Updated Branches:
  refs/heads/trunk 6a7985d87 -> 9343bd407


Implement / integrate FileSegmentInputStream.seek() into CommitLogReader

Patch by ichaudhry; reviewed by jmckenzie for CASSANDRA-11957


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9343bd40
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9343bd40
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9343bd40

Branch: refs/heads/trunk
Commit: 9343bd4070d69f9c1558656deccfd8e3692c2c80
Parents: 6a7985d
Author: Imran Chaudhry <im...@datastax.com>
Authored: Tue Jun 21 13:20:32 2016 -0400
Committer: Josh McKenzie <jm...@apache.org>
Committed: Tue Jun 21 15:48:00 2016 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/commitlog/CommitLogReader.java |  6 ++-
 .../EncryptedFileSegmentInputStream.java        | 21 ++++++--
 .../db/commitlog/SegmentReaderTest.java         | 56 ++++++++++++++++++--
 4 files changed, 75 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9343bd40/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f3fc2ca..0695654 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.8
+ * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957)
  * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002)
  * When SEPWorker assigned work, set thread name to match pool (CASSANDRA-11966)
  * Add cross-DC latency metrics (CASSANDRA-11596)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9343bd40/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index 6c4bb60..a914cc9 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@ -199,8 +199,6 @@ public class CommitLogReader
 
                     statusTracker.errorContext = String.format("Next section at %d in %s", syncSegment.fileStartPosition, desc.fileName());
 
-                    // TODO: Since EncryptedFileSegmentInputStream doesn't implement seek(), we cannot pre-emptively seek
-                    // to the desired offset in the syncSegment before reading the section and deserializing the mutations.
                     readSection(handler, syncSegment.input, minPosition, syncSegment.endPosition, statusTracker, desc);
                     if (!statusTracker.shouldContinue())
                         break;
@@ -254,6 +252,10 @@ public class CommitLogReader
                              ReadStatusTracker statusTracker,
                              CommitLogDescriptor desc) throws IOException
     {
+        // seek rather than deserializing mutation-by-mutation to reach the desired minPosition in this SyncSegment
+        if (desc.id == minPosition.segmentId && reader.getFilePointer() < minPosition.position)
+            reader.seek(minPosition.position);
+
         while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF())
         {
             long mutationStart = reader.getFilePointer();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9343bd40/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
index cd7f7cb..9da3d50 100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
@@ -38,7 +38,7 @@ public class EncryptedFileSegmentInputStream extends FileSegmentInputStream impl
     private final ChunkProvider chunkProvider;
 
     /**
-     * offset the decrypted chunks already processed in this segment.
+     * Offset representing the decrypted chunks already processed in this segment.
      */
     private int totalChunkOffset;
 
@@ -76,8 +76,23 @@ public class EncryptedFileSegmentInputStream extends FileSegmentInputStream impl
 
     public void seek(long position)
     {
-        // implement this when we actually need it
-        throw new UnsupportedOperationException();
+        long bufferPos = position - totalChunkOffset - segmentOffset;
+        while (buffer != null && bufferPos > buffer.capacity())
+        {
+            // rebuffer repeatedly until we have reached desired position
+            buffer.position(buffer.limit());
+
+            // increases totalChunkOffset
+            reBuffer();
+            bufferPos = position - totalChunkOffset - segmentOffset;
+        }
+        if (buffer == null || bufferPos < 0 || bufferPos > buffer.capacity())
+            throw new IllegalArgumentException(
+                    String.format("Unable to seek to position %d in %s (%d bytes) in partial mode",
+                            position,
+                            getPath(),
+                            segmentOffset + expectedLength));
+        buffer.position((int) bufferPos);
     }
 
     public long bytesPastMark(DataPosition mark)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9343bd40/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
index 034566e..88300a1 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
@@ -26,6 +26,8 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.Collections;
 import java.util.Random;
+import java.util.function.BiFunction;
+
 import javax.crypto.Cipher;
 
 import org.junit.Assert;
@@ -106,15 +108,61 @@ public class SegmentReaderTest
         }
     }
 
-    private ByteBuffer readBytes(DataInput input, int len) throws IOException
+    private ByteBuffer readBytes(FileDataInput input, int len)
     {
         byte[] buf = new byte[len];
-        input.readFully(buf);
+        try
+        {
+            input.readFully(buf);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
         return ByteBuffer.wrap(buf);
     }
 
+    private ByteBuffer readBytesSeek(FileDataInput input, int len)
+    {
+        byte[] buf = new byte[len];
+
+        /// divide output buffer into 5
+        int[] offsets = new int[] { 0, len / 5, 2 * len / 5, 3 * len / 5, 4 * len / 5, len };
+        
+        //seek offset
+        long inputStart = input.getFilePointer();
+
+        for (int i = 0; i < offsets.length - 1; i++)
+        {
+            try
+            {
+                // seek to beginning of offet
+                input.seek(inputStart + offsets[i]);
+                //read this segment
+                input.readFully(buf, offsets[i], offsets[i + 1] - offsets[i]);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+        return ByteBuffer.wrap(buf);
+    }
+
+    @Test
+    public void encryptedSegmenterRead() throws IOException
+    {
+        underlyingEncryptedSegmenterTest((s, t) -> readBytes(s, t));
+    }
+
     @Test
-    public void encryptedSegmenter() throws IOException
+    public void encryptedSegmenterSeek() throws IOException
+    {
+        underlyingEncryptedSegmenterTest((s, t) -> readBytesSeek(s, t));
+    }
+
+    public void underlyingEncryptedSegmenterTest(BiFunction<FileDataInput, Integer, ByteBuffer> readFun)
+            throws IOException
     {
         EncryptionContext context = EncryptionContextGenerator.createContext(true);
         CipherFactory cipherFactory = new CipherFactory(context.getTransparentDataEncryptionOptions());
@@ -140,7 +188,7 @@ public class SegmentReaderTest
 
             // EncryptedSegmenter includes the Sync header length in the syncSegment.endPosition (value)
             Assert.assertEquals(plainTextLength, syncSegment.endPosition - CommitLogSegment.SYNC_MARKER_SIZE);
-            ByteBuffer fileBuffer = readBytes(syncSegment.input, plainTextLength);
+            ByteBuffer fileBuffer = readFun.apply(syncSegment.input, plainTextLength);
             plainTextBuffer.position(0);
             Assert.assertEquals(plainTextBuffer, fileBuffer);
         }