You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2016/06/21 19:48:49 UTC
cassandra git commit: Implement / integrate
FileSegmentInputStream.seek() into CommitLogReader
Repository: cassandra
Updated Branches:
refs/heads/trunk 6a7985d87 -> 9343bd407
Implement / integrate FileSegmentInputStream.seek() into CommitLogReader
Patch by ichaudhry; reviewed by jmckenzie for CASSANDRA-11957
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9343bd40
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9343bd40
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9343bd40
Branch: refs/heads/trunk
Commit: 9343bd4070d69f9c1558656deccfd8e3692c2c80
Parents: 6a7985d
Author: Imran Chaudhry <im...@datastax.com>
Authored: Tue Jun 21 13:20:32 2016 -0400
Committer: Josh McKenzie <jm...@apache.org>
Committed: Tue Jun 21 15:48:00 2016 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/commitlog/CommitLogReader.java | 6 ++-
.../EncryptedFileSegmentInputStream.java | 21 ++++++--
.../db/commitlog/SegmentReaderTest.java | 56 ++++++++++++++++++--
4 files changed, 75 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9343bd40/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f3fc2ca..0695654 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.8
+ * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957)
* SSTable tools mishandling LocalPartitioner (CASSANDRA-12002)
* When SEPWorker assigned work, set thread name to match pool (CASSANDRA-11966)
* Add cross-DC latency metrics (CASSANDRA-11596)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9343bd40/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index 6c4bb60..a914cc9 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@ -199,8 +199,6 @@ public class CommitLogReader
statusTracker.errorContext = String.format("Next section at %d in %s", syncSegment.fileStartPosition, desc.fileName());
- // TODO: Since EncryptedFileSegmentInputStream doesn't implement seek(), we cannot pre-emptively seek
- // to the desired offset in the syncSegment before reading the section and deserializing the mutations.
readSection(handler, syncSegment.input, minPosition, syncSegment.endPosition, statusTracker, desc);
if (!statusTracker.shouldContinue())
break;
@@ -254,6 +252,10 @@ public class CommitLogReader
ReadStatusTracker statusTracker,
CommitLogDescriptor desc) throws IOException
{
+ // seek rather than deserializing mutation-by-mutation to reach the desired minPosition in this SyncSegment
+ if (desc.id == minPosition.segmentId && reader.getFilePointer() < minPosition.position)
+ reader.seek(minPosition.position);
+
while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF())
{
long mutationStart = reader.getFilePointer();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9343bd40/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
index cd7f7cb..9da3d50 100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedFileSegmentInputStream.java
@@ -38,7 +38,7 @@ public class EncryptedFileSegmentInputStream extends FileSegmentInputStream impl
private final ChunkProvider chunkProvider;
/**
- * offset the decrypted chunks already processed in this segment.
+ * Offset representing the decrypted chunks already processed in this segment.
*/
private int totalChunkOffset;
@@ -76,8 +76,23 @@ public class EncryptedFileSegmentInputStream extends FileSegmentInputStream impl
public void seek(long position)
{
- // implement this when we actually need it
- throw new UnsupportedOperationException();
+ long bufferPos = position - totalChunkOffset - segmentOffset;
+ while (buffer != null && bufferPos > buffer.capacity())
+ {
+ // rebuffer repeatedly until we have reached desired position
+ buffer.position(buffer.limit());
+
+ // increases totalChunkOffset
+ reBuffer();
+ bufferPos = position - totalChunkOffset - segmentOffset;
+ }
+ if (buffer == null || bufferPos < 0 || bufferPos > buffer.capacity())
+ throw new IllegalArgumentException(
+ String.format("Unable to seek to position %d in %s (%d bytes) in partial mode",
+ position,
+ getPath(),
+ segmentOffset + expectedLength));
+ buffer.position((int) bufferPos);
}
public long bytesPastMark(DataPosition mark)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9343bd40/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
index 034566e..88300a1 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java
@@ -26,6 +26,8 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Collections;
import java.util.Random;
+import java.util.function.BiFunction;
+
import javax.crypto.Cipher;
import org.junit.Assert;
@@ -106,15 +108,61 @@ public class SegmentReaderTest
}
}
- private ByteBuffer readBytes(DataInput input, int len) throws IOException
+ private ByteBuffer readBytes(FileDataInput input, int len)
{
byte[] buf = new byte[len];
- input.readFully(buf);
+ try
+ {
+ input.readFully(buf);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
return ByteBuffer.wrap(buf);
}
+ private ByteBuffer readBytesSeek(FileDataInput input, int len)
+ {
+ byte[] buf = new byte[len];
+
+ /// divide output buffer into 5
+ int[] offsets = new int[] { 0, len / 5, 2 * len / 5, 3 * len / 5, 4 * len / 5, len };
+
+ //seek offset
+ long inputStart = input.getFilePointer();
+
+ for (int i = 0; i < offsets.length - 1; i++)
+ {
+ try
+ {
+ // seek to beginning of offet
+ input.seek(inputStart + offsets[i]);
+ //read this segment
+ input.readFully(buf, offsets[i], offsets[i + 1] - offsets[i]);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ return ByteBuffer.wrap(buf);
+ }
+
+ @Test
+ public void encryptedSegmenterRead() throws IOException
+ {
+ underlyingEncryptedSegmenterTest((s, t) -> readBytes(s, t));
+ }
+
@Test
- public void encryptedSegmenter() throws IOException
+ public void encryptedSegmenterSeek() throws IOException
+ {
+ underlyingEncryptedSegmenterTest((s, t) -> readBytesSeek(s, t));
+ }
+
+ public void underlyingEncryptedSegmenterTest(BiFunction<FileDataInput, Integer, ByteBuffer> readFun)
+ throws IOException
{
EncryptionContext context = EncryptionContextGenerator.createContext(true);
CipherFactory cipherFactory = new CipherFactory(context.getTransparentDataEncryptionOptions());
@@ -140,7 +188,7 @@ public class SegmentReaderTest
// EncryptedSegmenter includes the Sync header length in the syncSegment.endPosition (value)
Assert.assertEquals(plainTextLength, syncSegment.endPosition - CommitLogSegment.SYNC_MARKER_SIZE);
- ByteBuffer fileBuffer = readBytes(syncSegment.input, plainTextLength);
+ ByteBuffer fileBuffer = readFun.apply(syncSegment.input, plainTextLength);
plainTextBuffer.position(0);
Assert.assertEquals(plainTextBuffer, fileBuffer);
}