You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/11/03 17:08:00 UTC

[01/10] cassandra git commit: Fix streaming to catch exception so retry not fail

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 986a1a769 -> 068614ccc
  refs/heads/cassandra-2.2 82aa7969c -> a549bd085
  refs/heads/cassandra-3.0 0ad0de139 -> 87f5e2e39
  refs/heads/trunk 468a4c58c -> f505e8bdf


Fix streaming to catch exception so retry not fail

patch by yukim; reviewed by Paulo Motta for CASSANDRA-10557


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/068614cc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/068614cc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/068614cc

Branch: refs/heads/cassandra-2.1
Commit: 068614ccc7ba6c5b8ccb50a0840af57bb45b4b36
Parents: 986a1a7
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Oct 20 14:25:55 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:33:37 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/streaming/StreamReader.java   | 16 ++++++++++++++--
 .../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
 3 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3d22b91..4c24b35 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
  * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
  * Support encrypted and plain traffic on the same port (CASSANDRA-10559)
  * Do STCS in DTCS windows (CASSANDRA-10276)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index c96a925..5389a80 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -89,11 +89,12 @@ public class StreamReader
         }
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
         DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
         BytesReadTracker in = new BytesReadTracker(dis);
+        SSTableWriter writer = null;
         try
         {
+            writer = createWriter(cfs, totalSize, repairedAt);
             while (in.getBytesRead() < totalSize)
             {
                 writeRow(writer, in, cfs);
@@ -104,7 +105,18 @@ public class StreamReader
         }
         catch (Throwable e)
         {
-            writer.abort();
+            if (writer != null)
+            {
+                try
+                {
+                    writer.abort();
+                }
+                catch (Throwable e2)
+                {
+                    // add abort error to original and continue so we can drain unread stream
+                    e.addSuppressed(e2);
+                }
+            }
             drain(dis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index fb2599f..0529496 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -72,12 +72,12 @@ public class CompressedStreamReader extends StreamReader
         }
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
-
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+        SSTableWriter writer = null;
         try
         {
+            writer = createWriter(cfs, totalSize, repairedAt);
             for (Pair<Long, Long> section : sections)
             {
                 long length = section.right - section.left;
@@ -95,7 +95,18 @@ public class CompressedStreamReader extends StreamReader
         }
         catch (Throwable e)
         {
-            writer.abort();
+            if (writer != null)
+            {
+                try
+                {
+                    writer.abort();
+                }
+                catch (Throwable e2)
+                {
+                    // add abort error to original and continue so we can drain unread stream
+                    e.addSuppressed(e2);
+                }
+            }
             drain(cis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;


[07/10] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a549bd08
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a549bd08
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a549bd08

Branch: refs/heads/trunk
Commit: a549bd085f5244b3271249ce881ac30dd3f27553
Parents: 82aa796 068614c
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 3 09:40:49 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:40:49 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/streaming/StreamReader.java   | 17 ++++++++++++++---
 .../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
 3 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ac997f2,4c24b35..5c23acf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
 -2.1.12
 +2.2.4
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
   * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
   * Support encrypted and plain traffic on the same port (CASSANDRA-10559)
   * Do STCS in DTCS windows (CASSANDRA-10276)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 1a3980d,5389a80..1ccebb0
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -94,12 -89,12 +94,12 @@@ public class StreamReade
          }
          ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
  
-         SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
- 
          DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
          BytesReadTracker in = new BytesReadTracker(dis);
+         SSTableWriter writer = null;
          try
          {
 -            writer = createWriter(cfs, totalSize, repairedAt);
++            writer = createWriter(cfs, totalSize, repairedAt, format);
              while (in.getBytesRead() < totalSize)
              {
                  writeRow(writer, in, cfs);
@@@ -108,9 -102,21 +108,20 @@@
                  session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
              }
              return writer;
 -        }
 -        catch (Throwable e)
 +        } catch (Throwable e)
          {
-             writer.abort();
+             if (writer != null)
+             {
+                 try
+                 {
+                     writer.abort();
+                 }
+                 catch (Throwable e2)
+                 {
+                     // add abort error to original and continue so we can drain unread stream
+                     e.addSuppressed(e2);
+                 }
+             }
              drain(dis, in.getBytesRead());
              if (e instanceof IOException)
                  throw (IOException) e;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 1936a94,0529496..facb906
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -75,17 -72,15 +75,17 @@@ public class CompressedStreamReader ext
          }
          ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
  
-         SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
- 
 -        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
 +        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
          BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+         SSTableWriter writer = null;
          try
          {
 -            writer = createWriter(cfs, totalSize, repairedAt);
++            writer = createWriter(cfs, totalSize, repairedAt, format);
              for (Pair<Long, Long> section : sections)
              {
 -                long length = section.right - section.left;
 +                assert cis.getTotalCompressedBytesRead() <= totalSize;
 +                int sectionLength = (int) (section.right - section.left);
 +
                  // skip to beginning of section inside chunk
                  cis.position(section.left);
                  in.reset(0);


[06/10] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a549bd08
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a549bd08
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a549bd08

Branch: refs/heads/cassandra-2.2
Commit: a549bd085f5244b3271249ce881ac30dd3f27553
Parents: 82aa796 068614c
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 3 09:40:49 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:40:49 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/streaming/StreamReader.java   | 17 ++++++++++++++---
 .../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
 3 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ac997f2,4c24b35..5c23acf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
 -2.1.12
 +2.2.4
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
   * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
   * Support encrypted and plain traffic on the same port (CASSANDRA-10559)
   * Do STCS in DTCS windows (CASSANDRA-10276)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 1a3980d,5389a80..1ccebb0
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -94,12 -89,12 +94,12 @@@ public class StreamReade
          }
          ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
  
-         SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
- 
          DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
          BytesReadTracker in = new BytesReadTracker(dis);
+         SSTableWriter writer = null;
          try
          {
 -            writer = createWriter(cfs, totalSize, repairedAt);
++            writer = createWriter(cfs, totalSize, repairedAt, format);
              while (in.getBytesRead() < totalSize)
              {
                  writeRow(writer, in, cfs);
@@@ -108,9 -102,21 +108,20 @@@
                  session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
              }
              return writer;
 -        }
 -        catch (Throwable e)
 +        } catch (Throwable e)
          {
-             writer.abort();
+             if (writer != null)
+             {
+                 try
+                 {
+                     writer.abort();
+                 }
+                 catch (Throwable e2)
+                 {
+                     // add abort error to original and continue so we can drain unread stream
+                     e.addSuppressed(e2);
+                 }
+             }
              drain(dis, in.getBytesRead());
              if (e instanceof IOException)
                  throw (IOException) e;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 1936a94,0529496..facb906
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -75,17 -72,15 +75,17 @@@ public class CompressedStreamReader ext
          }
          ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
  
-         SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
- 
 -        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
 +        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
          BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+         SSTableWriter writer = null;
          try
          {
 -            writer = createWriter(cfs, totalSize, repairedAt);
++            writer = createWriter(cfs, totalSize, repairedAt, format);
              for (Pair<Long, Long> section : sections)
              {
 -                long length = section.right - section.left;
 +                assert cis.getTotalCompressedBytesRead() <= totalSize;
 +                int sectionLength = (int) (section.right - section.left);
 +
                  // skip to beginning of section inside chunk
                  cis.position(section.left);
                  in.reset(0);


[04/10] cassandra git commit: Fix streaming to catch exception so retry not fail

Posted by yu...@apache.org.
Fix streaming to catch exception so retry not fail

patch by yukim; reviewed by Paulo Motta for CASSANDRA-10557


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/068614cc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/068614cc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/068614cc

Branch: refs/heads/trunk
Commit: 068614ccc7ba6c5b8ccb50a0840af57bb45b4b36
Parents: 986a1a7
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Oct 20 14:25:55 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:33:37 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/streaming/StreamReader.java   | 16 ++++++++++++++--
 .../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
 3 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3d22b91..4c24b35 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
  * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
  * Support encrypted and plain traffic on the same port (CASSANDRA-10559)
  * Do STCS in DTCS windows (CASSANDRA-10276)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index c96a925..5389a80 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -89,11 +89,12 @@ public class StreamReader
         }
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
         DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
         BytesReadTracker in = new BytesReadTracker(dis);
+        SSTableWriter writer = null;
         try
         {
+            writer = createWriter(cfs, totalSize, repairedAt);
             while (in.getBytesRead() < totalSize)
             {
                 writeRow(writer, in, cfs);
@@ -104,7 +105,18 @@ public class StreamReader
         }
         catch (Throwable e)
         {
-            writer.abort();
+            if (writer != null)
+            {
+                try
+                {
+                    writer.abort();
+                }
+                catch (Throwable e2)
+                {
+                    // add abort error to original and continue so we can drain unread stream
+                    e.addSuppressed(e2);
+                }
+            }
             drain(dis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index fb2599f..0529496 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -72,12 +72,12 @@ public class CompressedStreamReader extends StreamReader
         }
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
-
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+        SSTableWriter writer = null;
         try
         {
+            writer = createWriter(cfs, totalSize, repairedAt);
             for (Pair<Long, Long> section : sections)
             {
                 long length = section.right - section.left;
@@ -95,7 +95,18 @@ public class CompressedStreamReader extends StreamReader
         }
         catch (Throwable e)
         {
-            writer.abort();
+            if (writer != null)
+            {
+                try
+                {
+                    writer.abort();
+                }
+                catch (Throwable e2)
+                {
+                    // add abort error to original and continue so we can drain unread stream
+                    e.addSuppressed(e2);
+                }
+            }
             drain(cis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;


[05/10] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a549bd08
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a549bd08
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a549bd08

Branch: refs/heads/cassandra-3.0
Commit: a549bd085f5244b3271249ce881ac30dd3f27553
Parents: 82aa796 068614c
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 3 09:40:49 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:40:49 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/streaming/StreamReader.java   | 17 ++++++++++++++---
 .../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
 3 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ac997f2,4c24b35..5c23acf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
 -2.1.12
 +2.2.4
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
   * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
   * Support encrypted and plain traffic on the same port (CASSANDRA-10559)
   * Do STCS in DTCS windows (CASSANDRA-10276)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 1a3980d,5389a80..1ccebb0
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -94,12 -89,12 +94,12 @@@ public class StreamReade
          }
          ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
  
-         SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
- 
          DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
          BytesReadTracker in = new BytesReadTracker(dis);
+         SSTableWriter writer = null;
          try
          {
 -            writer = createWriter(cfs, totalSize, repairedAt);
++            writer = createWriter(cfs, totalSize, repairedAt, format);
              while (in.getBytesRead() < totalSize)
              {
                  writeRow(writer, in, cfs);
@@@ -108,9 -102,21 +108,20 @@@
                  session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
              }
              return writer;
 -        }
 -        catch (Throwable e)
 +        } catch (Throwable e)
          {
-             writer.abort();
+             if (writer != null)
+             {
+                 try
+                 {
+                     writer.abort();
+                 }
+                 catch (Throwable e2)
+                 {
+                     // add abort error to original and continue so we can drain unread stream
+                     e.addSuppressed(e2);
+                 }
+             }
              drain(dis, in.getBytesRead());
              if (e instanceof IOException)
                  throw (IOException) e;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a549bd08/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 1936a94,0529496..facb906
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -75,17 -72,15 +75,17 @@@ public class CompressedStreamReader ext
          }
          ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
  
-         SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
- 
 -        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
 +        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
          BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+         SSTableWriter writer = null;
          try
          {
 -            writer = createWriter(cfs, totalSize, repairedAt);
++            writer = createWriter(cfs, totalSize, repairedAt, format);
              for (Pair<Long, Long> section : sections)
              {
 -                long length = section.right - section.left;
 +                assert cis.getTotalCompressedBytesRead() <= totalSize;
 +                int sectionLength = (int) (section.right - section.left);
 +
                  // skip to beginning of section inside chunk
                  cis.position(section.left);
                  in.reset(0);


[03/10] cassandra git commit: Fix streaming to catch exception so retry not fail

Posted by yu...@apache.org.
Fix streaming to catch exception so retry not fail

patch by yukim; reviewed by Paulo Motta for CASSANDRA-10557


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/068614cc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/068614cc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/068614cc

Branch: refs/heads/cassandra-3.0
Commit: 068614ccc7ba6c5b8ccb50a0840af57bb45b4b36
Parents: 986a1a7
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Oct 20 14:25:55 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:33:37 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/streaming/StreamReader.java   | 16 ++++++++++++++--
 .../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
 3 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3d22b91..4c24b35 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
  * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
  * Support encrypted and plain traffic on the same port (CASSANDRA-10559)
  * Do STCS in DTCS windows (CASSANDRA-10276)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index c96a925..5389a80 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -89,11 +89,12 @@ public class StreamReader
         }
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
         DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
         BytesReadTracker in = new BytesReadTracker(dis);
+        SSTableWriter writer = null;
         try
         {
+            writer = createWriter(cfs, totalSize, repairedAt);
             while (in.getBytesRead() < totalSize)
             {
                 writeRow(writer, in, cfs);
@@ -104,7 +105,18 @@ public class StreamReader
         }
         catch (Throwable e)
         {
-            writer.abort();
+            if (writer != null)
+            {
+                try
+                {
+                    writer.abort();
+                }
+                catch (Throwable e2)
+                {
+                    // add abort error to original and continue so we can drain unread stream
+                    e.addSuppressed(e2);
+                }
+            }
             drain(dis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index fb2599f..0529496 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -72,12 +72,12 @@ public class CompressedStreamReader extends StreamReader
         }
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
-
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+        SSTableWriter writer = null;
         try
         {
+            writer = createWriter(cfs, totalSize, repairedAt);
             for (Pair<Long, Long> section : sections)
             {
                 long length = section.right - section.left;
@@ -95,7 +95,18 @@ public class CompressedStreamReader extends StreamReader
         }
         catch (Throwable e)
         {
-            writer.abort();
+            if (writer != null)
+            {
+                try
+                {
+                    writer.abort();
+                }
+                catch (Throwable e2)
+                {
+                    // add abort error to original and continue so we can drain unread stream
+                    e.addSuppressed(e2);
+                }
+            }
             drain(cis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;


[02/10] cassandra git commit: Fix streaming to catch exception so retry not fail

Posted by yu...@apache.org.
Fix streaming to catch exception so retry not fail

patch by yukim; reviewed by Paulo Motta for CASSANDRA-10557


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/068614cc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/068614cc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/068614cc

Branch: refs/heads/cassandra-2.2
Commit: 068614ccc7ba6c5b8ccb50a0840af57bb45b4b36
Parents: 986a1a7
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Oct 20 14:25:55 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 09:33:37 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/streaming/StreamReader.java   | 16 ++++++++++++++--
 .../streaming/compress/CompressedStreamReader.java | 17 ++++++++++++++---
 3 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3d22b91..4c24b35 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
  * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
  * Support encrypted and plain traffic on the same port (CASSANDRA-10559)
  * Do STCS in DTCS windows (CASSANDRA-10276)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index c96a925..5389a80 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -89,11 +89,12 @@ public class StreamReader
         }
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
         DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
         BytesReadTracker in = new BytesReadTracker(dis);
+        SSTableWriter writer = null;
         try
         {
+            writer = createWriter(cfs, totalSize, repairedAt);
             while (in.getBytesRead() < totalSize)
             {
                 writeRow(writer, in, cfs);
@@ -104,7 +105,18 @@ public class StreamReader
         }
         catch (Throwable e)
         {
-            writer.abort();
+            if (writer != null)
+            {
+                try
+                {
+                    writer.abort();
+                }
+                catch (Throwable e2)
+                {
+                    // add abort error to original and continue so we can drain unread stream
+                    e.addSuppressed(e2);
+                }
+            }
             drain(dis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/068614cc/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index fb2599f..0529496 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -72,12 +72,12 @@ public class CompressedStreamReader extends StreamReader
         }
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt);
-
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums);
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+        SSTableWriter writer = null;
         try
         {
+            writer = createWriter(cfs, totalSize, repairedAt);
             for (Pair<Long, Long> section : sections)
             {
                 long length = section.right - section.left;
@@ -95,7 +95,18 @@ public class CompressedStreamReader extends StreamReader
         }
         catch (Throwable e)
         {
-            writer.abort();
+            if (writer != null)
+            {
+                try
+                {
+                    writer.abort();
+                }
+                catch (Throwable e2)
+                {
+                    // add abort error to original and continue so we can drain unread stream
+                    e.addSuppressed(e2);
+                }
+            }
             drain(cis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;


[10/10] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f505e8bd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f505e8bd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f505e8bd

Branch: refs/heads/trunk
Commit: f505e8bdfe1edc13a3dbdb60ccf5d8bc9b710cd3
Parents: 468a4c5 87f5e2e
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 3 10:03:40 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 10:03:40 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                             |  1 +
 .../org/apache/cassandra/streaming/StreamReader.java    | 12 ++++++++----
 .../streaming/compress/CompressedStreamReader.java      | 11 ++++++++---
 3 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f505e8bd/CHANGES.txt
----------------------------------------------------------------------


[08/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/87f5e2e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/87f5e2e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/87f5e2e3

Branch: refs/heads/cassandra-3.0
Commit: 87f5e2e39c1003c36eba97a92721920f87db3fed
Parents: 0ad0de1 a549bd0
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 3 10:03:34 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 10:03:34 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                             |  1 +
 .../org/apache/cassandra/streaming/StreamReader.java    | 12 ++++++++----
 .../streaming/compress/CompressedStreamReader.java      | 11 ++++++++---
 3 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1724f01,5c23acf..e0208c6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,12 +1,27 @@@
 -2.2.4
 - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581)
 +3.0
 + * Fix implementation of LegacyLayout.LegacyBoundComparator (CASSANDRA-10602)
 + * Don't use 'names query' read path for counters (CASSANDRA-10572)
 + * Fix backward compatibility for counters (CASSANDRA-10470)
 + * Remove memory_allocator paramter from cassandra.yaml (CASSANDRA-10581)
 + * Execute the metadata reload task of all registered indexes on CFS::reload (CASSANDRA-10604)
 + * Fix thrift cas operations with defined columns (CASSANDRA-10576)
 + * Fix PartitionUpdate.operationCount()for updates with static column operations (CASSANDRA-10606)
 + * Fix thrift get() queries with defined columns (CASSANDRA-10586)
 + * Fix marking of indexes as built and removed (CASSANDRA-10601)
 + * Skip initialization of non-registered 2i instances, remove Index::getIndexName (CASSANDRA-10595)
 + * Fix batches on multiple tables (CASSANDRA-10554)
 + * Ensure compaction options are validated when updating KeyspaceMetadata (CASSANDRA-10569)
 + * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975)
 + * Remove token generator (CASSANDRA-5261)
 + * RolesCache should not be created for any authenticator that does not requireAuthentication (CASSANDRA-10562)
 + * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421)
 + * Fix handling of range tombstones when reading old format sstables (CASSANDRA-10360)
 + * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367)
 +Merged from 2.2:
   * Expose phi values from failure detector via JMX and tweak debug
     and trace logging (CASSANDRA-9526)
 - * Fix RangeNamesQueryPager (CASSANDRA-10509)
 - * Deprecate Pig support (CASSANDRA-10542)
 - * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
  Merged from 2.1:
+  * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
   * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
   * Support encrypted and plain traffic on the same port (CASSANDRA-10559)
   * Do STCS in DTCS windows (CASSANDRA-10276)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 879491e,1ccebb0..6169494
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -97,25 -94,34 +97,29 @@@ public class StreamReade
          }
          ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
  
-         SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
- 
          DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
          BytesReadTracker in = new BytesReadTracker(dis);
 -        SSTableWriter writer = null;
 +        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
++        SSTableMultiWriter writer = null;
          try
          {
+             writer = createWriter(cfs, totalSize, repairedAt, format);
              while (in.getBytesRead() < totalSize)
              {
 -                writeRow(writer, in, cfs);
 -
 +                writePartition(deserializer, writer, cfs);
                  // TODO move this to BytesReadTracker
                  session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
              }
              return writer;
 -        } catch (Throwable e)
 +        }
 +        catch (Throwable e)
          {
-             SSTableMultiWriter.abortOrDie(writer);
- 
+             if (writer != null)
+             {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can drain unread stream
 -                    e.addSuppressed(e2);
 -                }
++                Throwable e2 = writer.abort(null);
++                // add abort error to original and continue so we can drain unread stream
++                e.addSuppressed(e2);
+             }
              drain(dis, in.getBytesRead());
              if (e instanceof IOException)
                  throw (IOException) e;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 30cafef,facb906..fca6aa7
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -74,14 -75,12 +74,14 @@@ public class CompressedStreamReader ext
          }
          ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
  
-         SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
- 
 -        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
 +        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
 +                                                              inputVersion.compressedChecksumType(), cfs::getCrcCheckChance);
          BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
 -        SSTableWriter writer = null;
 +        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
++        SSTableMultiWriter writer = null;
          try
          {
+             writer = createWriter(cfs, totalSize, repairedAt, format);
              for (Pair<Long, Long> section : sections)
              {
                  assert cis.getTotalCompressedBytesRead() <= totalSize;
@@@ -102,7 -102,18 +102,12 @@@
          }
          catch (Throwable e)
          {
-             SSTableMultiWriter.abortOrDie(writer);
+             if (writer != null)
+             {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can drain unread stream
 -                    e.addSuppressed(e2);
 -                }
++                Throwable e2 = writer.abort(null);
++                // add abort error to original and continue so we can drain unread stream
++                e.addSuppressed(e2);
+             }
              drain(cis, in.getBytesRead());
              if (e instanceof IOException)
                  throw (IOException) e;


[09/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/87f5e2e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/87f5e2e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/87f5e2e3

Branch: refs/heads/trunk
Commit: 87f5e2e39c1003c36eba97a92721920f87db3fed
Parents: 0ad0de1 a549bd0
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Nov 3 10:03:34 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Nov 3 10:03:34 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                             |  1 +
 .../org/apache/cassandra/streaming/StreamReader.java    | 12 ++++++++----
 .../streaming/compress/CompressedStreamReader.java      | 11 ++++++++---
 3 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1724f01,5c23acf..e0208c6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,12 +1,27 @@@
 -2.2.4
 - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581)
 +3.0
 + * Fix implementation of LegacyLayout.LegacyBoundComparator (CASSANDRA-10602)
 + * Don't use 'names query' read path for counters (CASSANDRA-10572)
 + * Fix backward compatibility for counters (CASSANDRA-10470)
 + * Remove memory_allocator paramter from cassandra.yaml (CASSANDRA-10581)
 + * Execute the metadata reload task of all registered indexes on CFS::reload (CASSANDRA-10604)
 + * Fix thrift cas operations with defined columns (CASSANDRA-10576)
 + * Fix PartitionUpdate.operationCount()for updates with static column operations (CASSANDRA-10606)
 + * Fix thrift get() queries with defined columns (CASSANDRA-10586)
 + * Fix marking of indexes as built and removed (CASSANDRA-10601)
 + * Skip initialization of non-registered 2i instances, remove Index::getIndexName (CASSANDRA-10595)
 + * Fix batches on multiple tables (CASSANDRA-10554)
 + * Ensure compaction options are validated when updating KeyspaceMetadata (CASSANDRA-10569)
 + * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975)
 + * Remove token generator (CASSANDRA-5261)
 + * RolesCache should not be created for any authenticator that does not requireAuthentication (CASSANDRA-10562)
 + * Fix LogTransaction checking only a single directory for files (CASSANDRA-10421)
 + * Fix handling of range tombstones when reading old format sstables (CASSANDRA-10360)
 + * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367)
 +Merged from 2.2:
   * Expose phi values from failure detector via JMX and tweak debug
     and trace logging (CASSANDRA-9526)
 - * Fix RangeNamesQueryPager (CASSANDRA-10509)
 - * Deprecate Pig support (CASSANDRA-10542)
 - * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
  Merged from 2.1:
+  * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
   * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
   * Support encrypted and plain traffic on the same port (CASSANDRA-10559)
   * Do STCS in DTCS windows (CASSANDRA-10276)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 879491e,1ccebb0..6169494
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -97,25 -94,34 +97,29 @@@ public class StreamReade
          }
          ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
  
-         SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
- 
          DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
          BytesReadTracker in = new BytesReadTracker(dis);
 -        SSTableWriter writer = null;
 +        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
++        SSTableMultiWriter writer = null;
          try
          {
+             writer = createWriter(cfs, totalSize, repairedAt, format);
              while (in.getBytesRead() < totalSize)
              {
 -                writeRow(writer, in, cfs);
 -
 +                writePartition(deserializer, writer, cfs);
                  // TODO move this to BytesReadTracker
                  session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
              }
              return writer;
 -        } catch (Throwable e)
 +        }
 +        catch (Throwable e)
          {
-             SSTableMultiWriter.abortOrDie(writer);
- 
+             if (writer != null)
+             {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can drain unread stream
 -                    e.addSuppressed(e2);
 -                }
++                Throwable e2 = writer.abort(null);
++                // add abort error to original and continue so we can drain unread stream
++                e.addSuppressed(e2);
+             }
              drain(dis, in.getBytesRead());
              if (e instanceof IOException)
                  throw (IOException) e;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 30cafef,facb906..fca6aa7
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -74,14 -75,12 +74,14 @@@ public class CompressedStreamReader ext
          }
          ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
  
-         SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
- 
 -        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
 +        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
 +                                                              inputVersion.compressedChecksumType(), cfs::getCrcCheckChance);
          BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
 -        SSTableWriter writer = null;
 +        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
++        SSTableMultiWriter writer = null;
          try
          {
+             writer = createWriter(cfs, totalSize, repairedAt, format);
              for (Pair<Long, Long> section : sections)
              {
                  assert cis.getTotalCompressedBytesRead() <= totalSize;
@@@ -102,7 -102,18 +102,12 @@@
          }
          catch (Throwable e)
          {
-             SSTableMultiWriter.abortOrDie(writer);
+             if (writer != null)
+             {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can drain unread stream
 -                    e.addSuppressed(e2);
 -                }
++                Throwable e2 = writer.abort(null);
++                // add abort error to original and continue so we can drain unread stream
++                e.addSuppressed(e2);
+             }
              drain(cis, in.getBytesRead());
              if (e instanceof IOException)
                  throw (IOException) e;