You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/11/18 02:32:10 UTC
[04/15] cassandra git commit: Create compression chunk when sending
file only
Create compression chunk when sending file only
patch by yukim; reviewed by Paulo Motta for CASSANDRA-10680
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8385bb63
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8385bb63
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8385bb63
Branch: refs/heads/cassandra-3.0
Commit: 8385bb639ad8a6a86393a05813fd9b0b45876a2e
Parents: 9b97766
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 9 23:01:31 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 17 17:58:37 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../io/compress/CompressionMetadata.java | 30 ++++++++++++
.../streaming/messages/FileMessageHeader.java | 49 ++++++++++++++++++--
.../streaming/messages/IncomingFileMessage.java | 2 +-
.../streaming/messages/OutgoingFileMessage.java | 15 ++----
.../compress/CompressedInputStreamTest.java | 7 +++
6 files changed, 87 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 08db386..008d4d4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Create compression chunk for sending file only (CASSANDRA-10680)
* Make buffered read size configurable (CASSANDRA-10249)
* Forbid compact clustering column type changes in ALTER TABLE (CASSANDRA-8879)
* Reject incremental repair with subrange repair (CASSANDRA-10422)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 1dc2df3..0de69a6 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -229,6 +229,36 @@ public class CompressionMetadata
}
/**
+ * @param sections Collection of sections in uncompressed file. Should not contain sections that overlap each other.
+ * @return Total chunk size in bytes for given sections including checksum.
+ */
+ public long getTotalSizeForSections(Collection<Pair<Long, Long>> sections)
+ {
+ long size = 0;
+ long lastOffset = -1;
+ for (Pair<Long, Long> section : sections)
+ {
+ int startIndex = (int) (section.left / parameters.chunkLength());
+ int endIndex = (int) (section.right / parameters.chunkLength());
+ endIndex = section.right % parameters.chunkLength() == 0 ? endIndex - 1 : endIndex;
+ for (int i = startIndex; i <= endIndex; i++)
+ {
+ long offset = i * 8L;
+ long chunkOffset = chunkOffsets.getLong(offset);
+ if (chunkOffset > lastOffset)
+ {
+ lastOffset = chunkOffset;
+ long nextChunkOffset = offset + 8 == chunkOffsetsSize
+ ? compressedFileLength
+ : chunkOffsets.getLong(offset + 8);
+ size += (nextChunkOffset - chunkOffset);
+ }
+ }
+ }
+ return size;
+ }
+
+ /**
* @param sections Collection of sections in uncompressed file
* @return Array of chunks which corresponds to given sections of uncompressed file, sorted by chunk offset
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 284820e..34d9a01 100644
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -37,7 +37,7 @@ import org.apache.cassandra.utils.UUIDSerializer;
*/
public class FileMessageHeader
{
- public static IVersionedSerializer<FileMessageHeader> serializer = new FileMessageHeaderSerializer();
+ public static FileMessageHeaderSerializer serializer = new FileMessageHeaderSerializer();
public final UUID cfId;
public final int sequenceNumber;
@@ -45,7 +45,13 @@ public class FileMessageHeader
public final String version;
public final long estimatedKeys;
public final List<Pair<Long, Long>> sections;
+ /**
+ * Compression info for SSTable to send. Can be null if SSTable is not compressed.
+ * On sender, this field is always null to avoid holding large number of Chunks.
+ * Use compressionMetadata instead.
+ */
public final CompressionInfo compressionInfo;
+ private final CompressionMetadata compressionMetadata;
public final long repairedAt;
public FileMessageHeader(UUID cfId,
@@ -62,9 +68,33 @@ public class FileMessageHeader
this.estimatedKeys = estimatedKeys;
this.sections = sections;
this.compressionInfo = compressionInfo;
+ this.compressionMetadata = null;
+ this.repairedAt = repairedAt;
+ }
+
+ public FileMessageHeader(UUID cfId,
+ int sequenceNumber,
+ String version,
+ long estimatedKeys,
+ List<Pair<Long, Long>> sections,
+ CompressionMetadata compressionMetadata,
+ long repairedAt)
+ {
+ this.cfId = cfId;
+ this.sequenceNumber = sequenceNumber;
+ this.version = version;
+ this.estimatedKeys = estimatedKeys;
+ this.sections = sections;
+ this.compressionInfo = null;
+ this.compressionMetadata = compressionMetadata;
this.repairedAt = repairedAt;
}
+ public boolean isCompressed()
+ {
+ return compressionInfo != null || compressionMetadata != null;
+ }
+
/**
* @return total file size to transfer in bytes
*/
@@ -77,6 +107,10 @@ public class FileMessageHeader
for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
size += chunk.length + 4; // 4 bytes for CRC
}
+ else if (compressionMetadata != null)
+ {
+ size = compressionMetadata.getTotalSizeForSections(sections);
+ }
else
{
for (Pair<Long, Long> section : sections)
@@ -94,7 +128,7 @@ public class FileMessageHeader
sb.append(", version: ").append(version);
sb.append(", estimated keys: ").append(estimatedKeys);
sb.append(", transfer size: ").append(size());
- sb.append(", compressed?: ").append(compressionInfo != null);
+ sb.append(", compressed?: ").append(isCompressed());
sb.append(", repairedAt: ").append(repairedAt);
sb.append(')');
return sb.toString();
@@ -117,9 +151,9 @@ public class FileMessageHeader
return result;
}
- static class FileMessageHeaderSerializer implements IVersionedSerializer<FileMessageHeader>
+ static class FileMessageHeaderSerializer
{
- public void serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException
+ public CompressionInfo serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException
{
UUIDSerializer.serializer.serialize(header.cfId, out, version);
out.writeInt(header.sequenceNumber);
@@ -132,8 +166,13 @@ public class FileMessageHeader
out.writeLong(section.left);
out.writeLong(section.right);
}
- CompressionInfo.serializer.serialize(header.compressionInfo, out, version);
+ // construct CompressionInfo here to avoid holding large number of Chunks on heap.
+ CompressionInfo compressionInfo = null;
+ if (header.compressionMetadata != null)
+ compressionInfo = new CompressionInfo(header.compressionMetadata.getChunksForSections(header.sections), header.compressionMetadata.parameters);
+ CompressionInfo.serializer.serialize(compressionInfo, out, version);
out.writeLong(header.repairedAt);
+ return compressionInfo;
}
public FileMessageHeader deserialize(DataInput in, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index 494af85..cb39275 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -40,7 +40,7 @@ public class IncomingFileMessage extends StreamMessage
{
DataInputStream input = new DataInputStream(Channels.newInputStream(in));
FileMessageHeader header = FileMessageHeader.serializer.deserialize(input, version);
- StreamReader reader = header.compressionInfo == null ? new StreamReader(header, session)
+ StreamReader reader = !header.isCompressed() ? new StreamReader(header, session)
: new CompressedStreamReader(header, session);
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
index 082e306..71902e1 100644
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
@@ -63,18 +63,12 @@ public class OutgoingFileMessage extends StreamMessage
SSTableReader sstable = ref.get();
filename = sstable.getFilename();
- CompressionInfo compressionInfo = null;
- if (sstable.compression)
- {
- CompressionMetadata meta = sstable.getCompressionMetadata();
- compressionInfo = new CompressionInfo(meta.getChunksForSections(sections), meta.parameters);
- }
this.header = new FileMessageHeader(sstable.metadata.cfId,
sequenceNumber,
sstable.descriptor.version.toString(),
estimatedKeys,
sections,
- compressionInfo,
+ sstable.compression ? sstable.getCompressionMetadata() : null,
repairedAt);
}
@@ -85,13 +79,12 @@ public class OutgoingFileMessage extends StreamMessage
return;
}
- FileMessageHeader.serializer.serialize(header, out, version);
+ CompressionInfo compressionInfo = FileMessageHeader.serializer.serialize(header, out, version);
final SSTableReader reader = ref.get();
- StreamWriter writer = header.compressionInfo == null ?
+ StreamWriter writer = compressionInfo == null ?
new StreamWriter(reader, header.sections, session) :
- new CompressedStreamWriter(reader, header.sections,
- header.compressionInfo, session);
+ new CompressedStreamWriter(reader, header.sections, compressionInfo, session);
writer.write(out.getChannel());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8385bb63/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 42a83a0..f3007da 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -37,6 +37,8 @@ import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.Pair;
+import static org.junit.Assert.assertEquals;
+
/**
*/
public class CompressedInputStreamTest
@@ -86,6 +88,11 @@ public class CompressedInputStreamTest
sections.add(Pair.create(position, position + 8));
}
CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections);
+ long totalSize = comp.getTotalSizeForSections(sections);
+ long expectedSize = 0;
+ for (CompressionMetadata.Chunk c : chunks)
+ expectedSize += c.length + 4;
+ assertEquals(expectedSize, totalSize);
// buffer up only relevant parts of file
int size = 0;