You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/03/24 19:25:54 UTC

[2/5] cassandra git commit: Fix potential data loss in CompressedSequentialWriter (2.1 patch)

Fix potential data loss in CompressedSequentialWriter (2.1 patch)

Patch by jmckenzie; reviewed by tjake for CASSANDRA-8949


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ff82706
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ff82706
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ff82706

Branch: refs/heads/trunk
Commit: 9ff82706f206d135b1795bf84cbcab23915617f4
Parents: 9d2b273
Author: Joshua McKenzie <jm...@apache.org>
Authored: Tue Mar 24 13:19:45 2015 -0500
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Tue Mar 24 13:19:45 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../io/compress/CompressedSequentialWriter.java |   3 +
 .../cassandra/io/compress/LZ4Compressor.java    |   6 +-
 .../CompressedSequentialWriterTest.java         | 125 +++++++++++++++++++
 4 files changed, 134 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ff82706/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 06a3c3f..2000bd0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.4
+ * Fix potential data loss in CompressedSequentialWriter (CASSANDRA-8949)
  * Make PasswordAuthenticator number of hashing rounds configurable (CASSANDRA-8085)
  * Fix AssertionError when binding nested collections in DELETE (CASSANDRA-8900)
  * Check for overlap with non-early sstables in LCS (CASSANDRA-8739)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ff82706/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 6152c5f..c004232 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -229,6 +229,9 @@ public class CompressedSequentialWriter extends SequentialWriter
         bufferOffset = current - validBufferBytes;
         chunkCount = realMark.nextChunkIndex - 1;
 
+        // mark as dirty so we don't lose the bytes on subsequent reBuffer calls
+        isDirty = true;
+
         // truncate data and index file
         truncate(chunkOffset);
         metadataWriter.resetAndTruncate(realMark.nextChunkIndex - 1);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ff82706/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
index 0cf36c1..6f56f30 100644
--- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
+++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
@@ -23,6 +23,8 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import net.jpountz.lz4.LZ4Exception;
 import net.jpountz.lz4.LZ4Factory;
 
@@ -30,7 +32,9 @@ public class LZ4Compressor implements ICompressor
 {
 
     private static final int INTEGER_BYTES = 4;
-    private static final LZ4Compressor instance = new LZ4Compressor();
+
+    @VisibleForTesting
+    public static final LZ4Compressor instance = new LZ4Compressor();
 
     public static LZ4Compressor create(Map<String, String> args)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ff82706/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
new file mode 100644
index 0000000..fe3ba46
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.compress;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.FileMark;
+
+public class CompressedSequentialWriterTest
+{
+    private ICompressor compressor;
+
+    private void runTests(String testName) throws IOException, ConfigurationException
+    {
+        // Test small < 1 chunk data set
+        testWrite(File.createTempFile(testName + "_small", "1"), 25);
+
+        // Test to confirm pipeline w/chunk-aligned data writes works
+        testWrite(File.createTempFile(testName + "_chunkAligned", "1"), CompressionParameters.DEFAULT_CHUNK_LENGTH);
+
+        // Test to confirm pipeline on non-chunk boundaries works
+        testWrite(File.createTempFile(testName + "_large", "1"), CompressionParameters.DEFAULT_CHUNK_LENGTH * 3 + 100);
+    }
+
+    @Test
+    public void testLZ4Writer() throws IOException, ConfigurationException
+    {
+        compressor = LZ4Compressor.instance;
+        runTests("LZ4");
+    }
+
+    @Test
+    public void testDeflateWriter() throws IOException, ConfigurationException
+    {
+        compressor = DeflateCompressor.instance;
+        runTests("Deflate");
+    }
+
+    @Test
+    public void testSnappyWriter() throws IOException, ConfigurationException
+    {
+        compressor = SnappyCompressor.instance;
+        runTests("Snappy");
+    }
+
+    private void testWrite(File f, int bytesToTest) throws IOException, ConfigurationException
+    {
+        try
+        {
+            final String filename = f.getAbsolutePath();
+
+            MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
+            CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(compressor), sstableMetadataCollector);
+
+            byte[] dataPre = new byte[bytesToTest];
+            byte[] rawPost = new byte[bytesToTest];
+            Random r = new Random();
+
+            // Test both write with byte[] and ByteBuffer
+            r.nextBytes(dataPre);
+            r.nextBytes(rawPost);
+
+            writer.write(dataPre);
+            FileMark mark = writer.mark();
+
+            // Write enough garbage to transition chunk
+            for (int i = 0; i < CompressionParameters.DEFAULT_CHUNK_LENGTH; i++)
+            {
+                writer.write((byte)i);
+            }
+            writer.resetAndTruncate(mark);
+            writer.write(rawPost);
+            writer.close();
+
+            assert f.exists();
+            CompressedRandomAccessReader reader = CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length(), true));
+            assertEquals(dataPre.length + rawPost.length, reader.length());
+            byte[] result = new byte[(int)reader.length()];
+
+            reader.readFully(result);
+
+            assert(reader.isEOF());
+            reader.close();
+
+            byte[] fullInput = new byte[bytesToTest * 2];
+            System.arraycopy(dataPre, 0, fullInput, 0, dataPre.length);
+            System.arraycopy(rawPost, 0, fullInput, bytesToTest, rawPost.length);
+            assert Arrays.equals(result, fullInput);
+        }
+        finally
+        {
+            // cleanup
+            if (f.exists())
+                f.delete();
+            File metadata = new File(f + ".metadata");
+            if (metadata.exists())
+                metadata.delete();
+        }
+    }
+}