You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/01/15 19:52:53 UTC
[1/3] git commit: fix streaming progress report for compressed files;
patch by yukim reviewed by mkjellman for CASSANDRA-5130
fix streaming progress report for compressed files; patch by yukim reviewed by mkjellman for CASSANDRA-5130
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a548f05d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a548f05d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a548f05d
Branch: refs/heads/cassandra-1.2
Commit: a548f05dea3b845be4bbc2b532208ce8ee6b8b7f
Parents: 7ca9b69
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Jan 15 12:52:17 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Jan 15 12:52:17 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/IncomingStreamReader.java | 9 +++++----
.../streaming/compress/CompressedInputStream.java | 8 ++++----
3 files changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a548f05d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 32ed9fa..f57f656 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -35,6 +35,7 @@
(CASSANDRA-5144)
* Fix inserting empty maps (CASSANDRA-5141)
* Don't remove tokens from System table for node we know (CASSANDRA-5121)
+ * fix streaming progress report for compresed files (CASSANDRA-5130)
Merged from 1.1:
* Simplify CompressedRandomAccessReader to work around JDK FD bug (CASSANDRA-5088)
* Improve handling a changing target throttle rate mid-compaction (CASSANDRA-5087)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a548f05d/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index 656a99d..8036afd 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -181,10 +181,11 @@ public class IncomingStreamReader
}
bytesRead += in.getBytesRead();
- // when compressed, report total bytes of decompressed chunks since remoteFile.size is the sum of chunks transferred
- remoteFile.progress += remoteFile.compressionInfo != null
- ? ((CompressedInputStream) underliningStream).uncompressedBytes()
- : in.getBytesRead();
+ // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
+ if (remoteFile.compressionInfo != null)
+ remoteFile.progress = ((CompressedInputStream) underliningStream).getTotalCompressedBytesRead();
+ else
+ remoteFile.progress += in.getBytesRead();
totalBytesRead += in.getBytesRead();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a548f05d/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index c226cb6..6fe1793 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -54,7 +54,7 @@ public class CompressedInputStream extends InputStream
// raw checksum bytes
private final byte[] checksumBytes = new byte[4];
- private long uncompressedBytes;
+ private long totalCompressedBytesRead;
/**
* @param source Input source to read compressed data from
@@ -99,7 +99,7 @@ public class CompressedInputStream extends InputStream
{
// uncompress
validBufferBytes = info.parameters.sstableCompressor.uncompress(compressed, 0, compressed.length - checksumBytes.length, buffer, 0);
- uncompressedBytes += validBufferBytes;
+ totalCompressedBytesRead += compressed.length;
// validate crc randomly
if (info.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble())
@@ -118,9 +118,9 @@ public class CompressedInputStream extends InputStream
bufferOffset = current & ~(buffer.length - 1);
}
- public long uncompressedBytes()
+ public long getTotalCompressedBytesRead()
{
- return uncompressedBytes;
+ return totalCompressedBytesRead;
}
static class Reader extends WrappedRunnable