You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/11/03 17:08:00 UTC
[01/10] cassandra git commit: Fix streaming to catch exception so
retry not fail
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 986a1a769 -> 068614ccc
refs/heads/cassandra-2.2 82aa7969c -> a549bd085
refs/heads/cassandra-3.0 0ad0de139 -> 87f5e2e39
refs/heads/trunk 468a4c58c -> f505e8bdf
Fix streaming to catch exception so retry not fail
patch by yukim; reviewed by Paulo Motta for CASSANDRA-10557
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/068614cc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/068614cc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/068614cc
Branch: refs/heads/cassandra-2.1
Commit: 068614ccc7ba6c5b8ccb50a0840af57bb45b4b36
Parents: 986a1a7
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Oct 20 14:25:55 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:33:37 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/streaming/StreamReader.java | 16 ++++++++++++++--
.../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
3 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3d22b91..4c24b35 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
* Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
* Support encrypted and plain traffic on the same port (CASSANDRA-10559)
* Do STCS in DTCS windows (CASSANDRA-10276)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index c96a925..5389a80 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -89,11 +89,12 @@ public class StreamReader
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
+ SSTableWriter writer = null;
try
{
+ writer = createWriter(cfs, totalSize, repairedAt);
while (in.getBytesRead() < totalSize)
{
writeRow(writer, in, cfs);
@@ -104,7 +105,18 @@ public class StreamReader
}
catch (Throwable e)
{
- writer.abort();
+ if (writer != null)
+ {
+ try
+ {
+ writer.abort();
+ }
+ catch (Throwable e2)
+ {
+ // add abort error to original and continue so we can drain unread stream
+ e.addSuppressed(e2);
+ }
+ }
drain(dis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index fb2599f..0529496 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -72,12 +72,12 @@ public class CompressedStreamReader extends StreamReader
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
-
CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+ SSTableWriter writer = null;
try
{
+ writer = createWriter(cfs, totalSize, repairedAt);
for (Pair<Long, Long> section : sections)
{
long length = section.right - section.left;
@@ -95,7 +95,18 @@ public class CompressedStreamReader extends StreamReader
}
catch (Throwable e)
{
- writer.abort();
+ if (writer != null)
+ {
+ try
+ {
+ writer.abort();
+ }
+ catch (Throwable e2)
+ {
+ // add abort error to original and continue so we can drain unread stream
+ e.addSuppressed(e2);
+ }
+ }
drain(cis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
[07/10] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a549bd08
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a549bd08
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a549bd08
Branch: refs/heads/trunk
Commit: a549bd085f5244b3271249ce881ac30dd3f27553
Parents: 82aa796 068614c
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 3 09:40:49 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:40:49 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/streaming/StreamReader.java | 17 ++++++++++++++---
.../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
3 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ac997f2,4c24b35..5c23acf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
-2.1.12
+2.2.4
+ * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581)
+ * Expose phi values from failure detector via JMX and tweak debug
+ and trace logging (CASSANDRA-9526)
+ * Fix RangeNamesQueryPager (CASSANDRA-10509)
+ * Deprecate Pig support (CASSANDRA-10542)
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+Merged from 2.1:
+ * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
* Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
* Support encrypted and plain traffic on the same port (CASSANDRA-10559)
* Do STCS in DTCS windows (CASSANDRA-10276)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 1a3980d,5389a80..1ccebb0
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -94,12 -89,12 +94,12 @@@ public class StreamReade
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
-
DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
+ SSTableWriter writer = null;
try
{
- writer = createWriter(cfs, totalSize, repairedAt);
++ writer = createWriter(cfs, totalSize, repairedAt, format);
while (in.getBytesRead() < totalSize)
{
writeRow(writer, in, cfs);
@@@ -108,9 -102,21 +108,20 @@@
session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
}
return writer;
- }
- catch (Throwable e)
+ } catch (Throwable e)
{
- writer.abort();
+ if (writer != null)
+ {
+ try
+ {
+ writer.abort();
+ }
+ catch (Throwable e2)
+ {
+ // add abort error to original and continue so we can drain unread stream
+ e.addSuppressed(e2);
+ }
+ }
drain(dis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 1936a94,0529496..facb906
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -75,17 -72,15 +75,17 @@@ public class CompressedStreamReader ext
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
-
- CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
+ CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+ SSTableWriter writer = null;
try
{
- writer = createWriter(cfs, totalSize, repairedAt);
++ writer = createWriter(cfs, totalSize, repairedAt, format);
for (Pair<Long, Long> section : sections)
{
- long length = section.right - section.left;
+ assert cis.getTotalCompressedBytesRead() <= totalSize;
+ int sectionLength = (int) (section.right - section.left);
+
// skip to beginning of section inside chunk
cis.position(section.left);
in.reset(0);
[06/10] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a549bd08
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a549bd08
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a549bd08
Branch: refs/heads/cassandra-2.2
Commit: a549bd085f5244b3271249ce881ac30dd3f27553
Parents: 82aa796 068614c
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 3 09:40:49 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:40:49 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/streaming/StreamReader.java | 17 ++++++++++++++---
.../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
3 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ac997f2,4c24b35..5c23acf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
-2.1.12
+2.2.4
+ * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581)
+ * Expose phi values from failure detector via JMX and tweak debug
+ and trace logging (CASSANDRA-9526)
+ * Fix RangeNamesQueryPager (CASSANDRA-10509)
+ * Deprecate Pig support (CASSANDRA-10542)
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+Merged from 2.1:
+ * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
* Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
* Support encrypted and plain traffic on the same port (CASSANDRA-10559)
* Do STCS in DTCS windows (CASSANDRA-10276)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 1a3980d,5389a80..1ccebb0
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -94,12 -89,12 +94,12 @@@ public class StreamReade
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
-
DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
+ SSTableWriter writer = null;
try
{
- writer = createWriter(cfs, totalSize, repairedAt);
++ writer = createWriter(cfs, totalSize, repairedAt, format);
while (in.getBytesRead() < totalSize)
{
writeRow(writer, in, cfs);
@@@ -108,9 -102,21 +108,20 @@@
session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
}
return writer;
- }
- catch (Throwable e)
+ } catch (Throwable e)
{
- writer.abort();
+ if (writer != null)
+ {
+ try
+ {
+ writer.abort();
+ }
+ catch (Throwable e2)
+ {
+ // add abort error to original and continue so we can drain unread stream
+ e.addSuppressed(e2);
+ }
+ }
drain(dis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 1936a94,0529496..facb906
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -75,17 -72,15 +75,17 @@@ public class CompressedStreamReader ext
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
-
- CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
+ CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+ SSTableWriter writer = null;
try
{
- writer = createWriter(cfs, totalSize, repairedAt);
++ writer = createWriter(cfs, totalSize, repairedAt, format);
for (Pair<Long, Long> section : sections)
{
- long length = section.right - section.left;
+ assert cis.getTotalCompressedBytesRead() <= totalSize;
+ int sectionLength = (int) (section.right - section.left);
+
// skip to beginning of section inside chunk
cis.position(section.left);
in.reset(0);
[04/10] cassandra git commit: Fix streaming to catch exception so
retry not fail
Posted by yu...@apache.org.
Fix streaming to catch exception so retry not fail
patch by yukim; reviewed by Paulo Motta for CASSANDRA-10557
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/068614cc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/068614cc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/068614cc
Branch: refs/heads/trunk
Commit: 068614ccc7ba6c5b8ccb50a0840af57bb45b4b36
Parents: 986a1a7
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Oct 20 14:25:55 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:33:37 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/streaming/StreamReader.java | 16 ++++++++++++++--
.../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
3 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3d22b91..4c24b35 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
* Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
* Support encrypted and plain traffic on the same port (CASSANDRA-10559)
* Do STCS in DTCS windows (CASSANDRA-10276)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index c96a925..5389a80 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -89,11 +89,12 @@ public class StreamReader
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
+ SSTableWriter writer = null;
try
{
+ writer = createWriter(cfs, totalSize, repairedAt);
while (in.getBytesRead() < totalSize)
{
writeRow(writer, in, cfs);
@@ -104,7 +105,18 @@ public class StreamReader
}
catch (Throwable e)
{
- writer.abort();
+ if (writer != null)
+ {
+ try
+ {
+ writer.abort();
+ }
+ catch (Throwable e2)
+ {
+ // add abort error to original and continue so we can drain unread stream
+ e.addSuppressed(e2);
+ }
+ }
drain(dis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index fb2599f..0529496 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -72,12 +72,12 @@ public class CompressedStreamReader extends StreamReader
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
-
CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+ SSTableWriter writer = null;
try
{
+ writer = createWriter(cfs, totalSize, repairedAt);
for (Pair<Long, Long> section : sections)
{
long length = section.right - section.left;
@@ -95,7 +95,18 @@ public class CompressedStreamReader extends StreamReader
}
catch (Throwable e)
{
- writer.abort();
+ if (writer != null)
+ {
+ try
+ {
+ writer.abort();
+ }
+ catch (Throwable e2)
+ {
+ // add abort error to original and continue so we can drain unread stream
+ e.addSuppressed(e2);
+ }
+ }
drain(cis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
[05/10] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a549bd08
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a549bd08
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a549bd08
Branch: refs/heads/cassandra-3.0
Commit: a549bd085f5244b3271249ce881ac30dd3f27553
Parents: 82aa796 068614c
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 3 09:40:49 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:40:49 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/streaming/StreamReader.java | 17 ++++++++++++++---
.../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
3 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ac997f2,4c24b35..5c23acf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
-2.1.12
+2.2.4
+ * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581)
+ * Expose phi values from failure detector via JMX and tweak debug
+ and trace logging (CASSANDRA-9526)
+ * Fix RangeNamesQueryPager (CASSANDRA-10509)
+ * Deprecate Pig support (CASSANDRA-10542)
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+Merged from 2.1:
+ * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
* Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
* Support encrypted and plain traffic on the same port (CASSANDRA-10559)
* Do STCS in DTCS windows (CASSANDRA-10276)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 1a3980d,5389a80..1ccebb0
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -94,12 -89,12 +94,12 @@@ public class StreamReade
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
-
DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
+ SSTableWriter writer = null;
try
{
- writer = createWriter(cfs, totalSize, repairedAt);
++ writer = createWriter(cfs, totalSize, repairedAt, format);
while (in.getBytesRead() < totalSize)
{
writeRow(writer, in, cfs);
@@@ -108,9 -102,21 +108,20 @@@
session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
}
return writer;
- }
- catch (Throwable e)
+ } catch (Throwable e)
{
- writer.abort();
+ if (writer != null)
+ {
+ try
+ {
+ writer.abort();
+ }
+ catch (Throwable e2)
+ {
+ // add abort error to original and continue so we can drain unread stream
+ e.addSuppressed(e2);
+ }
+ }
drain(dis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 1936a94,0529496..facb906
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -75,17 -72,15 +75,17 @@@ public class CompressedStreamReader ext
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
-
- CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
+ CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+ SSTableWriter writer = null;
try
{
- writer = createWriter(cfs, totalSize, repairedAt);
++ writer = createWriter(cfs, totalSize, repairedAt, format);
for (Pair<Long, Long> section : sections)
{
- long length = section.right - section.left;
+ assert cis.getTotalCompressedBytesRead() <= totalSize;
+ int sectionLength = (int) (section.right - section.left);
+
// skip to beginning of section inside chunk
cis.position(section.left);
in.reset(0);
[03/10] cassandra git commit: Fix streaming to catch exception so
retry not fail
Posted by yu...@apache.org.
Fix streaming to catch exception so retry not fail
patch by yukim; reviewed by Paulo Motta for CASSANDRA-10557
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/068614cc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/068614cc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/068614cc
Branch: refs/heads/cassandra-3.0
Commit: 068614ccc7ba6c5b8ccb50a0840af57bb45b4b36
Parents: 986a1a7
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Oct 20 14:25:55 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:33:37 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/streaming/StreamReader.java | 16 ++++++++++++++--
.../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
3 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3d22b91..4c24b35 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
* Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
* Support encrypted and plain traffic on the same port (CASSANDRA-10559)
* Do STCS in DTCS windows (CASSANDRA-10276)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index c96a925..5389a80 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -89,11 +89,12 @@ public class StreamReader
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
+ SSTableWriter writer = null;
try
{
+ writer = createWriter(cfs, totalSize, repairedAt);
while (in.getBytesRead() < totalSize)
{
writeRow(writer, in, cfs);
@@ -104,7 +105,18 @@ public class StreamReader
}
catch (Throwable e)
{
- writer.abort();
+ if (writer != null)
+ {
+ try
+ {
+ writer.abort();
+ }
+ catch (Throwable e2)
+ {
+ // add abort error to original and continue so we can drain unread stream
+ e.addSuppressed(e2);
+ }
+ }
drain(dis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index fb2599f..0529496 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -72,12 +72,12 @@ public class CompressedStreamReader extends StreamReader
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
-
CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+ SSTableWriter writer = null;
try
{
+ writer = createWriter(cfs, totalSize, repairedAt);
for (Pair<Long, Long> section : sections)
{
long length = section.right - section.left;
@@ -95,7 +95,18 @@ public class CompressedStreamReader extends StreamReader
}
catch (Throwable e)
{
- writer.abort();
+ if (writer != null)
+ {
+ try
+ {
+ writer.abort();
+ }
+ catch (Throwable e2)
+ {
+ // add abort error to original and continue so we can drain unread stream
+ e.addSuppressed(e2);
+ }
+ }
drain(cis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
[02/10] cassandra git commit: Fix streaming to catch exception so
retry not fail
Posted by yu...@apache.org.
Fix streaming to catch exception so retry not fail
patch by yukim; reviewed by Paulo Motta for CASSANDRA-10557
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/068614cc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/068614cc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/068614cc
Branch: refs/heads/cassandra-2.2
Commit: 068614ccc7ba6c5b8ccb50a0840af57bb45b4b36
Parents: 986a1a7
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Oct 20 14:25:55 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:33:37 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/streaming/StreamReader.java | 16 ++++++++++++++--
.../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
3 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3d22b91..4c24b35 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
* Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
* Support encrypted and plain traffic on the same port (CASSANDRA-10559)
* Do STCS in DTCS windows (CASSANDRA-10276)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index c96a925..5389a80 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -89,11 +89,12 @@ public class StreamReader
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
+ SSTableWriter writer = null;
try
{
+ writer = createWriter(cfs, totalSize, repairedAt);
while (in.getBytesRead() < totalSize)
{
writeRow(writer, in, cfs);
@@ -104,7 +105,18 @@ public class StreamReader
}
catch (Throwable e)
{
- writer.abort();
+ if (writer != null)
+ {
+ try
+ {
+ writer.abort();
+ }
+ catch (Throwable e2)
+ {
+ // add abort error to original and continue so we can drain unread stream
+ e.addSuppressed(e2);
+ }
+ }
drain(dis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index fb2599f..0529496 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -72,12 +72,12 @@ public class CompressedStreamReader extends StreamReader
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
-
CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+ SSTableWriter writer = null;
try
{
+ writer = createWriter(cfs, totalSize, repairedAt);
for (Pair<Long, Long> section : sections)
{
long length = section.right - section.left;
@@ -95,7 +95,18 @@ public class CompressedStreamReader extends StreamReader
}
catch (Throwable e)
{
- writer.abort();
+ if (writer != null)
+ {
+ try
+ {
+ writer.abort();
+ }
+ catch (Throwable e2)
+ {
+ // add abort error to original and continue so we can drain unread stream
+ e.addSuppressed(e2);
+ }
+ }
drain(cis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
[10/10] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f505e8bd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f505e8bd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f505e8bd
Branch: refs/heads/trunk
Commit: f505e8bdfe1edc13a3dbdb60ccf5d8bc9b710cd3
Parents: 468a4c5 87f5e2e
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 3 10:03:40 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 10:03:40 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/streaming/StreamReader.java | 12 ++++++++----
.../streaming/compress/CompressedStreamReader.java | 11 ++++++++---
3 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f505e8bd/CHANGES.txt
----------------------------------------------------------------------
[08/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/87f5e2e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/87f5e2e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/87f5e2e3
Branch: refs/heads/cassandra-3.0
Commit: 87f5e2e39c1003c36eba97a92721920f87db3fed
Parents: 0ad0de1 a549bd0
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 3 10:03:34 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 10:03:34 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/streaming/StreamReader.java | 12 ++++++++----
.../streaming/compress/CompressedStreamReader.java | 11 ++++++++---
3 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1724f01,5c23acf..e0208c6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,12 +1,27 @@@
-2.2.4
- * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581)
+3.0
+ * Fix implementation of LegacyLayout.LegacyBoundComparator (CASSANDRA-10602)
+ * Don't use 'names query' read path for counters (CASSANDRA-10572)
+ * Fix backward compatibility for counters (CASSANDRA-10470)
+ * Remove memory_allocator paramter from cassandra.yaml (CASSANDRA-10581)
+ * Execute the metadata reload task of all registered indexes on CFS::reload (CASSANDRA-10604)
+ * Fix thrift cas operations with defined columns (CASSANDRA-10576)
+ * Fix PartitionUpdate.operationCount()for updates with static column operations (CASSANDRA-10606)
+ * Fix thrift get() queries with defined columns (CASSANDRA-10586)
+ * Fix marking of indexes as built and removed (CASSANDRA-10601)
+ * Skip initialization of non-registered 2i instances, remove Index::getIndexName (CASSANDRA-10595)
+ * Fix batches on multiple tables (CASSANDRA-10554)
+ * Ensure compaction options are validated when updating KeyspaceMetadata (CASSANDRA-10569)
+ * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975)
+ * Remove token generator (CASSANDRA-5261)
+ * RolesCache should not be created for any authenticator that does not requireAuthentication (CASSANDRA-10562)
+ * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421)
+ * Fix handling of range tombstones when reading old format sstables (CASSANDRA-10360)
+ * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367)
+Merged from 2.2:
* Expose phi values from failure detector via JMX and tweak debug
and trace logging (CASSANDRA-9526)
- * Fix RangeNamesQueryPager (CASSANDRA-10509)
- * Deprecate Pig support (CASSANDRA-10542)
- * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
Merged from 2.1:
+ * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
* Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
* Support encrypted and plain traffic on the same port (CASSANDRA-10559)
* Do STCS in DTCS windows (CASSANDRA-10276)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 879491e,1ccebb0..6169494
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -97,25 -94,34 +97,29 @@@ public class StreamReade
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
-
DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
- SSTableWriter writer = null;
+ StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
++ SSTableMultiWriter writer = null;
try
{
+ writer = createWriter(cfs, totalSize, repairedAt, format);
while (in.getBytesRead() < totalSize)
{
- writeRow(writer, in, cfs);
-
+ writePartition(deserializer, writer, cfs);
// TODO move this to BytesReadTracker
session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
}
return writer;
- } catch (Throwable e)
+ }
+ catch (Throwable e)
{
- SSTableMultiWriter.abortOrDie(writer);
-
+ if (writer != null)
+ {
- try
- {
- writer.abort();
- }
- catch (Throwable e2)
- {
- // add abort error to original and continue so we can drain unread stream
- e.addSuppressed(e2);
- }
++ Throwable e2 = writer.abort(null);
++ // add abort error to original and continue so we can drain unread stream
++ e.addSuppressed(e2);
+ }
drain(dis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 30cafef,facb906..fca6aa7
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -74,14 -75,12 +74,14 @@@ public class CompressedStreamReader ext
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
-
- CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
+ CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
+ inputVersion.compressedChecksumType(), cfs::getCrcCheckChance);
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
- SSTableWriter writer = null;
+ StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
++ SSTableMultiWriter writer = null;
try
{
+ writer = createWriter(cfs, totalSize, repairedAt, format);
for (Pair<Long, Long> section : sections)
{
assert cis.getTotalCompressedBytesRead() <= totalSize;
@@@ -102,7 -102,18 +102,12 @@@
}
catch (Throwable e)
{
- SSTableMultiWriter.abortOrDie(writer);
+ if (writer != null)
+ {
- try
- {
- writer.abort();
- }
- catch (Throwable e2)
- {
- // add abort error to original and continue so we can drain unread stream
- e.addSuppressed(e2);
- }
++ Throwable e2 = writer.abort(null);
++ // add abort error to original and continue so we can drain unread stream
++ e.addSuppressed(e2);
+ }
drain(cis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
[09/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/87f5e2e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/87f5e2e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/87f5e2e3
Branch: refs/heads/trunk
Commit: 87f5e2e39c1003c36eba97a92721920f87db3fed
Parents: 0ad0de1 a549bd0
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 3 10:03:34 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 10:03:34 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/streaming/StreamReader.java | 12 ++++++++----
.../streaming/compress/CompressedStreamReader.java | 11 ++++++++---
3 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1724f01,5c23acf..e0208c6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,12 +1,27 @@@
-2.2.4
- * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581)
+3.0
+ * Fix implementation of LegacyLayout.LegacyBoundComparator (CASSANDRA-10602)
+ * Don't use 'names query' read path for counters (CASSANDRA-10572)
+ * Fix backward compatibility for counters (CASSANDRA-10470)
+ * Remove memory_allocator paramter from cassandra.yaml (CASSANDRA-10581)
+ * Execute the metadata reload task of all registered indexes on CFS::reload (CASSANDRA-10604)
+ * Fix thrift cas operations with defined columns (CASSANDRA-10576)
+ * Fix PartitionUpdate.operationCount()for updates with static column operations (CASSANDRA-10606)
+ * Fix thrift get() queries with defined columns (CASSANDRA-10586)
+ * Fix marking of indexes as built and removed (CASSANDRA-10601)
+ * Skip initialization of non-registered 2i instances, remove Index::getIndexName (CASSANDRA-10595)
+ * Fix batches on multiple tables (CASSANDRA-10554)
+ * Ensure compaction options are validated when updating KeyspaceMetadata (CASSANDRA-10569)
+ * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975)
+ * Remove token generator (CASSANDRA-5261)
+ * RolesCache should not be created for any authenticator that does not requireAuthentication (CASSANDRA-10562)
+ * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421)
+ * Fix handling of range tombstones when reading old format sstables (CASSANDRA-10360)
+ * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367)
+Merged from 2.2:
* Expose phi values from failure detector via JMX and tweak debug
and trace logging (CASSANDRA-9526)
- * Fix RangeNamesQueryPager (CASSANDRA-10509)
- * Deprecate Pig support (CASSANDRA-10542)
- * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
Merged from 2.1:
+ * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
* Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
* Support encrypted and plain traffic on the same port (CASSANDRA-10559)
* Do STCS in DTCS windows (CASSANDRA-10276)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 879491e,1ccebb0..6169494
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -97,25 -94,34 +97,29 @@@ public class StreamReade
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
-
DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
BytesReadTracker in = new BytesReadTracker(dis);
- SSTableWriter writer = null;
+ StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
++ SSTableMultiWriter writer = null;
try
{
+ writer = createWriter(cfs, totalSize, repairedAt, format);
while (in.getBytesRead() < totalSize)
{
- writeRow(writer, in, cfs);
-
+ writePartition(deserializer, writer, cfs);
// TODO move this to BytesReadTracker
session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
}
return writer;
- } catch (Throwable e)
+ }
+ catch (Throwable e)
{
- SSTableMultiWriter.abortOrDie(writer);
-
+ if (writer != null)
+ {
- try
- {
- writer.abort();
- }
- catch (Throwable e2)
- {
- // add abort error to original and continue so we can drain unread stream
- e.addSuppressed(e2);
- }
++ Throwable e2 = writer.abort(null);
++ // add abort error to original and continue so we can drain unread stream
++ e.addSuppressed(e2);
+ }
drain(dis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 30cafef,facb906..fca6aa7
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -74,14 -75,12 +74,14 @@@ public class CompressedStreamReader ext
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
-
- CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
+ CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
+ inputVersion.compressedChecksumType(), cfs::getCrcCheckChance);
BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
- SSTableWriter writer = null;
+ StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
++ SSTableMultiWriter writer = null;
try
{
+ writer = createWriter(cfs, totalSize, repairedAt, format);
for (Pair<Long, Long> section : sections)
{
assert cis.getTotalCompressedBytesRead() <= totalSize;
@@@ -102,7 -102,18 +102,12 @@@
}
catch (Throwable e)
{
- SSTableMultiWriter.abortOrDie(writer);
+ if (writer != null)
+ {
- try
- {
- writer.abort();
- }
- catch (Throwable e2)
- {
- // add abort error to original and continue so we can drain unread stream
- e.addSuppressed(e2);
- }
++ Throwable e2 = writer.abort(null);
++ // add abort error to original and continue so we can drain unread stream
++ e.addSuppressed(e2);
+ }
drain(cis, in.getBytesRead());
if (e instanceof IOException)
throw (IOException) e;