You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/11/18 02:32:10 UTC

[04/15] cassandra git commit: Create compression chunk when sending file only

Create compression chunk when sending file only

patch by yukim; reviewed by Paulo Motta for CASSANDRA-10680


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8385bb63
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8385bb63
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8385bb63

Branch: refs/heads/cassandra-3.0
Commit: 8385bb639ad8a6a86393a05813fd9b0b45876a2e
Parents: 9b97766
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 9 23:01:31 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 17 17:58:37 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../io/compress/CompressionMetadata.java        | 30 ++++++++++++
 .../streaming/messages/FileMessageHeader.java   | 49 ++++++++++++++++++--
 .../streaming/messages/IncomingFileMessage.java |  2 +-
 .../streaming/messages/OutgoingFileMessage.java | 15 ++----
 .../compress/CompressedInputStreamTest.java     |  7 +++
 6 files changed, 87 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 08db386..008d4d4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Create compression chunk for sending file only (CASSANDRA-10680)
  * Make buffered read size configurable (CASSANDRA-10249)
  * Forbid compact clustering column type changes in ALTER TABLE (CASSANDRA-8879)
  * Reject incremental repair with subrange repair (CASSANDRA-10422)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 1dc2df3..0de69a6 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -229,6 +229,36 @@ public class CompressionMetadata
     }
 
     /**
+     * @param sections Collection of sections in uncompressed file. Should not contain sections that overlap each other.
+     * @return Total chunk size in bytes for given sections including checksum.
+     */
+    public long getTotalSizeForSections(Collection<Pair<Long, Long>> sections)
+    {
+        long size = 0;
+        long lastOffset = -1;
+        for (Pair<Long, Long> section : sections)
+        {
+            int startIndex = (int) (section.left / parameters.chunkLength());
+            int endIndex = (int) (section.right / parameters.chunkLength());
+            endIndex = section.right % parameters.chunkLength() == 0 ? endIndex - 1 : endIndex;
+            for (int i = startIndex; i <= endIndex; i++)
+            {
+                long offset = i * 8L;
+                long chunkOffset = chunkOffsets.getLong(offset);
+                if (chunkOffset > lastOffset)
+                {
+                    lastOffset = chunkOffset;
+                    long nextChunkOffset = offset + 8 == chunkOffsetsSize
+                                                   ? compressedFileLength
+                                                   : chunkOffsets.getLong(offset + 8);
+                    size += (nextChunkOffset - chunkOffset);
+                }
+            }
+        }
+        return size;
+    }
+
+    /**
      * @param sections Collection of sections in uncompressed file
      * @return Array of chunks which corresponds to given sections of uncompressed file, sorted by chunk offset
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 284820e..34d9a01 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -37,7 +37,7 @@ import org.apache.cassandra.utils.UUIDSerializer;
  */
 public class FileMessageHeader
 {
-    public static IVersionedSerializer<FileMessageHeader> serializer = new FileMessageHeaderSerializer();
+    public static FileMessageHeaderSerializer serializer = new FileMessageHeaderSerializer();
 
     public final UUID cfId;
     public final int sequenceNumber;
@@ -45,7 +45,13 @@ public class FileMessageHeader
     public final String version;
     public final long estimatedKeys;
     public final List<Pair<Long, Long>> sections;
+    /**
+     * Compression info for SSTable to send. Can be null if SSTable is not compressed.
+     * On sender, this field is always null to avoid holding large number of Chunks.
+     * Use compressionMetadata instead.
+     */
     public final CompressionInfo compressionInfo;
+    private final CompressionMetadata compressionMetadata;
     public final long repairedAt;
 
     public FileMessageHeader(UUID cfId,
@@ -62,9 +68,33 @@ public class FileMessageHeader
         this.estimatedKeys = estimatedKeys;
         this.sections = sections;
         this.compressionInfo = compressionInfo;
+        this.compressionMetadata = null;
+        this.repairedAt = repairedAt;
+    }
+
+    public FileMessageHeader(UUID cfId,
+                             int sequenceNumber,
+                             String version,
+                             long estimatedKeys,
+                             List<Pair<Long, Long>> sections,
+                             CompressionMetadata compressionMetadata,
+                             long repairedAt)
+    {
+        this.cfId = cfId;
+        this.sequenceNumber = sequenceNumber;
+        this.version = version;
+        this.estimatedKeys = estimatedKeys;
+        this.sections = sections;
+        this.compressionInfo = null;
+        this.compressionMetadata = compressionMetadata;
         this.repairedAt = repairedAt;
     }
 
+    public boolean isCompressed()
+    {
+        return compressionInfo != null || compressionMetadata != null;
+    }
+
     /**
      * @return total file size to transfer in bytes
      */
@@ -77,6 +107,10 @@ public class FileMessageHeader
             for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
                 size += chunk.length + 4; // 4 bytes for CRC
         }
+        else if (compressionMetadata != null)
+        {
+            size = compressionMetadata.getTotalSizeForSections(sections);
+        }
         else
         {
             for (Pair<Long, Long> section : sections)
@@ -94,7 +128,7 @@ public class FileMessageHeader
         sb.append(", version: ").append(version);
         sb.append(", estimated keys: ").append(estimatedKeys);
         sb.append(", transfer size: ").append(size());
-        sb.append(", compressed?: ").append(compressionInfo != null);
+        sb.append(", compressed?: ").append(isCompressed());
         sb.append(", repairedAt: ").append(repairedAt);
         sb.append(')');
         return sb.toString();
@@ -117,9 +151,9 @@ public class FileMessageHeader
         return result;
     }
 
-    static class FileMessageHeaderSerializer implements IVersionedSerializer<FileMessageHeader>
+    static class FileMessageHeaderSerializer
     {
-        public void serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException
+        public CompressionInfo serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException
         {
             UUIDSerializer.serializer.serialize(header.cfId, out, version);
             out.writeInt(header.sequenceNumber);
@@ -132,8 +166,13 @@ public class FileMessageHeader
                 out.writeLong(section.left);
                 out.writeLong(section.right);
             }
-            CompressionInfo.serializer.serialize(header.compressionInfo, out, version);
+            // construct CompressionInfo here to avoid holding large number of Chunks on heap.
+            CompressionInfo compressionInfo = null;
+            if (header.compressionMetadata != null)
+                compressionInfo = new CompressionInfo(header.compressionMetadata.getChunksForSections(header.sections), header.compressionMetadata.parameters);
+            CompressionInfo.serializer.serialize(compressionInfo, out, version);
             out.writeLong(header.repairedAt);
+            return compressionInfo;
         }
 
         public FileMessageHeader deserialize(DataInput in, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index 494af85..cb39275 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -40,7 +40,7 @@ public class IncomingFileMessage extends StreamMessage
         {
             DataInputStream input = new DataInputStream(Channels.newInputStream(in));
             FileMessageHeader header = FileMessageHeader.serializer.deserialize(input, version);
-            StreamReader reader = header.compressionInfo == null ? new StreamReader(header, session)
+            StreamReader reader = !header.isCompressed() ? new StreamReader(header, session)
                     : new CompressedStreamReader(header, session);
 
             try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 082e306..71902e1 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -63,18 +63,12 @@ public class OutgoingFileMessage extends StreamMessage
 
         SSTableReader sstable = ref.get();
         filename = sstable.getFilename();
-        CompressionInfo compressionInfo = null;
-        if (sstable.compression)
-        {
-            CompressionMetadata meta = sstable.getCompressionMetadata();
-            compressionInfo = new CompressionInfo(meta.getChunksForSections(sections), meta.parameters);
-        }
         this.header = new FileMessageHeader(sstable.metadata.cfId,
                                             sequenceNumber,
                                             sstable.descriptor.version.toString(),
                                             estimatedKeys,
                                             sections,
-                                            compressionInfo,
+                                            sstable.compression ? sstable.getCompressionMetadata() : null,
                                             repairedAt);
     }
 
@@ -85,13 +79,12 @@ public class OutgoingFileMessage extends StreamMessage
             return;
         }
 
-        FileMessageHeader.serializer.serialize(header, out, version);
+        CompressionInfo compressionInfo = FileMessageHeader.serializer.serialize(header, out, version);
 
         final SSTableReader reader = ref.get();
-        StreamWriter writer = header.compressionInfo == null ?
+        StreamWriter writer = compressionInfo == null ?
                                       new StreamWriter(reader, header.sections, session) :
-                                      new CompressedStreamWriter(reader, header.sections,
-                                                                 header.compressionInfo, session);
+                                      new CompressedStreamWriter(reader, header.sections, compressionInfo, session);
         writer.write(out.getChannel());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 42a83a0..f3007da 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -37,6 +37,8 @@ import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.Pair;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  */
 public class CompressedInputStreamTest
@@ -86,6 +88,11 @@ public class CompressedInputStreamTest
             sections.add(Pair.create(position, position + 8));
         }
         CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections);
+        long totalSize = comp.getTotalSizeForSections(sections);
+        long expectedSize = 0;
+        for (CompressionMetadata.Chunk c : chunks)
+            expectedSize += c.length + 4;
+        assertEquals(expectedSize, totalSize);
 
         // buffer up only relevant parts of file
         int size = 0;