You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/06/03 08:14:28 UTC

[1/4] git commit: Fix infinite loop on exception while streaming

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 371507a12 -> 0f2d7d0b9


Fix infinite loop on exception while streaming

Patch by JoshuaMcKenzie; reviewed by marcuse for CASSANDRA-7330


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/58bb974a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/58bb974a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/58bb974a

Branch: refs/heads/cassandra-2.1
Commit: 58bb974a330ab71d2ee34cace565f489e26629c7
Parents: 62d9c43
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 3 07:54:21 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 3 08:01:34 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                          |  1 +
 .../org/apache/cassandra/streaming/StreamReader.java | 15 +++++++++++++--
 2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/58bb974a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bc95a8d..37105f3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
  * Add authentication support to shuffle (CASSANDRA-6484)
  * Cqlsh counts non-empty lines for "Blank lines" warning (CASSANDRA-7325)
  * Make StreamSession#closeSession() idempotent (CASSANDRA-7262)
+ * Fix infinite loop on exception while streaming (CASSANDRA-7330)
 Merged from 1.2:
  * Fix availability validation for LOCAL_ONE CL (CASSANDRA-7319)
  * Use LOCAL_ONE for non-superuser auth queries (CASSANDRA-7328)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/58bb974a/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 72c239c..15aa3cb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -114,9 +114,20 @@ public class StreamReader
     protected void drain(InputStream dis, long bytesRead) throws IOException
     {
         long toSkip = totalSize() - bytesRead;
-        toSkip = toSkip - dis.skip(toSkip);
+
+        // InputStream.skip can return -1 if dis is inaccessible.
+        long skipped = dis.skip(toSkip);
+        if (skipped == -1)
+            return;
+
+        toSkip = toSkip - skipped;
         while (toSkip > 0)
-            toSkip = toSkip - dis.skip(toSkip);
+        {
+            skipped = dis.skip(toSkip);
+            if (skipped == -1)
+                break;
+            toSkip = toSkip - skipped;
+        }
     }
 
     protected long totalSize()


[4/4] git commit: Don't fail streams on failure detector downs

Posted by ma...@apache.org.
Don't fail streams on failure detector downs

Patch by JoshuaMcKenzie; reviewed by marcuse for CASSANDRA-3569


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f2d7d0b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f2d7d0b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f2d7d0b

Branch: refs/heads/cassandra-2.1
Commit: 0f2d7d0b9540efa3ea3dfe4f8270c3635afdc63c
Parents: 878990c
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 3 08:11:56 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 3 08:11:56 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 debian/cassandra-sysctl.conf                    |  1 +
 .../org/apache/cassandra/repair/RepairJob.java  | 15 ++++++--
 .../apache/cassandra/repair/RepairSession.java  | 37 ++++++++++++++++----
 .../cassandra/service/ActiveRepairService.java  |  1 -
 .../cassandra/streaming/ConnectionHandler.java  |  1 +
 .../cassandra/streaming/StreamSession.java      | 22 ++----------
 .../cassandra/streaming/StreamTransferTask.java | 12 +++++--
 8 files changed, 58 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 333606a..b5c2feb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
 2.1.0
  * Upgrade to Pig 0.12.1 (CASSANDRA-6556)
  * Make sure we clear out repair sessions from netstats (CASSANDRA-7329)
+ * Don't fail streams on failure detector downs (CASSANDRA-3569)
 Merged from 2.0:
  * Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323)
  * Make StreamSession#closeSession() idempotent (CASSANDRA-7262)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/debian/cassandra-sysctl.conf
----------------------------------------------------------------------
diff --git a/debian/cassandra-sysctl.conf b/debian/cassandra-sysctl.conf
index 2173765..443e83f 100644
--- a/debian/cassandra-sysctl.conf
+++ b/debian/cassandra-sysctl.conf
@@ -1 +1,2 @@
 vm.max_map_count = 1048575
+net.ipv4.tcp_keepalive_time=300

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index af00403..8057ed5 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -58,11 +58,21 @@ public class RepairJob
     /* Count down as sync completes */
     private AtomicInteger waitForSync;
 
+    private final IRepairJobEventListener listener;
+
     /**
      * Create repair job to run on specific columnfamily
      */
-    public RepairJob(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
+    public RepairJob(IRepairJobEventListener listener,
+                     UUID parentSessionId,
+                     UUID sessionId,
+                     String keyspace,
+                     String columnFamily,
+                     Range<Token> range,
+                     boolean isSequential,
+                     ListeningExecutorService taskExecutor)
     {
+        this.listener = listener;
         this.desc = new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range);
         this.isSequential = isSequential;
         this.taskExecutor = taskExecutor;
@@ -114,7 +124,8 @@ public class RepairJob
                 public void onFailure(Throwable throwable)
                 {
                     // TODO need to propagate error to RepairSession
-                    logger.error("Error while snapshot", throwable);
+                    logger.error("Error occurred during snapshot phase", throwable);
+                    listener.failedSnapshot();
                     failed = true;
                 }
             }, taskExecutor);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 507dafa..346f3f4 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -74,7 +74,9 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
  * Similarly, if a job is sequential, it will handle one Differencer at a time, but will handle
  * all of them in parallel otherwise.
  */
-public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber,
+                                                              IFailureDetectionEventListener,
+                                                              IRepairJobEventListener
 {
     private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
 
@@ -89,9 +91,11 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
 
     private volatile Exception exception;
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
+    private final AtomicBoolean fdUnregistered = new AtomicBoolean(false);
 
     // First, all RepairJobs are added to this queue,
     final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<>();
+
     // and after receiving all validation, the job is moved to
     // this map, keyed by CF name.
     final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>();
@@ -169,23 +173,32 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
         assert job.desc.equals(desc);
         if (job.addTree(endpoint, tree) == 0)
         {
-            logger.debug("All response received for {}/{}", getId(), desc.columnFamily);
+            logger.debug("All responses received for {}/{}", getId(), desc.columnFamily);
             if (!job.isFailed())
             {
                 syncingJobs.put(job.desc.columnFamily, job);
                 job.submitDifferencers();
             }
 
-            // This job is complete, switching to next in line (note that only
-            // one thread will can ever do this)
+            // This job is complete, switching to next in line (note that only one thread will ever do this)
             jobs.poll();
             RepairJob nextJob = jobs.peek();
             if (nextJob == null)
+            {
+                // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
+                // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
+                // See CASSANDRA-3569
+                if (fdUnregistered.compareAndSet(false, true))
+                    FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+
                 // We are done with this repair session as far as differencing
                 // is considered. Just inform the session
                 differencingDone.signalAll();
+            }
             else
+            {
                 nextJob.sendTreeRequests(endpoints);
+            }
         }
     }
 
@@ -271,7 +284,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
             // Create and queue a RepairJob for each column family
             for (String cfname : cfnames)
             {
-                RepairJob job = new RepairJob(parentRepairSession, id, keyspace, cfname, range, isSequential, taskExecutor);
+                RepairJob job = new RepairJob(this, parentRepairSession, id, keyspace, cfname, range, isSequential, taskExecutor);
                 jobs.offer(job);
             }
             logger.debug("Sending tree requests to endpoints {}", endpoints);
@@ -299,7 +312,13 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
         {
             // mark this session as terminated
             terminate();
+
             ActiveRepairService.instance.removeFromActiveSessions(this);
+
+            // If we've reached here in an exception state without completing Merkle Tree sync, we'll still be registered
+            // with the FailureDetector.
+            if (fdUnregistered.compareAndSet(false, true))
+                FailureDetector.instance.unregisterFailureDetectionEventListener(this);
         }
     }
 
@@ -320,11 +339,17 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
         completed.signalAll();
     }
 
+    public void failedSnapshot()
+    {
+        exception = new IOException("Failed during snapshot creation.");
+        forceShutdown();
+    }
+
     void failedNode(InetAddress remote)
     {
         String errorMsg = String.format("Endpoint %s died", remote);
         exception = new IOException(errorMsg);
-        // If a node failed, we stop everything (though there could still be some activity in the background)
+        // If a node failed during Merkle creation, we stop everything (though there could still be some activity in the background)
         forceShutdown();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b300547..7f7325b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -135,7 +135,6 @@ public class ActiveRepairService
 
     public void removeFromActiveSessions(RepairSession session)
     {
-        FailureDetector.instance.unregisterFailureDetectionEventListener(session);
         Gossiper.instance.unregister(session);
         sessions.remove(session.getId());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 5484c83..5716ae9 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -123,6 +123,7 @@ public class ConnectionHandler
             {
                 Socket socket = OutboundTcpConnectionPool.newSocket(peer);
                 socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
+                socket.setKeepAlive(true);
                 return socket;
             }
             catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 411f969..1afc07e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -108,7 +108,7 @@ import org.apache.cassandra.utils.Pair;
  *       session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and
  *       send a CompleteMessage to the other side.
  */
-public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+public class StreamSession implements IEndpointStateChangeSubscriber
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
     public final InetAddress peer;
@@ -181,10 +181,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
     public void init(StreamResultFuture streamResult)
     {
         this.streamResult = streamResult;
-
-        // register to gossiper/FD to fail on node failure
-        Gossiper.instance.register(this);
-        FailureDetector.instance.registerFailureDetectionEventListener(this);
     }
 
     public void start()
@@ -358,8 +354,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
             // incoming thread (so we would deadlock).
             handler.close();
 
-            Gossiper.instance.unregister(this);
-            FailureDetector.instance.unregisterFailureDetectionEventListener(this);
             streamResult.handleSessionComplete(this);
         }
     }
@@ -613,23 +607,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
 
     public void onRemove(InetAddress endpoint)
     {
-        convict(endpoint, Double.MAX_VALUE);
+        closeSession(State.FAILED);
     }
 
     public void onRestart(InetAddress endpoint, EndpointState epState)
     {
-        convict(endpoint, Double.MAX_VALUE);
-    }
-
-    public void convict(InetAddress endpoint, double phi)
-    {
-        if (!endpoint.equals(peer))
-            return;
-
-        // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost.
-        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
-            return;
-
         closeSession(State.FAILED);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 2fe75fa..48a7d89 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.streaming;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
@@ -33,6 +34,7 @@ public class StreamTransferTask extends StreamTask
     private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
 
     private final AtomicInteger sequenceNumber = new AtomicInteger(0);
+    private AtomicBoolean aborted = new AtomicBoolean(false);
 
     private final Map<Integer, OutgoingFileMessage> files = new ConcurrentHashMap<>();
 
@@ -75,11 +77,15 @@ public class StreamTransferTask extends StreamTask
 
     public void abort()
     {
-        for (OutgoingFileMessage file : files.values())
+        // Prevent releasing reference multiple times
+        if (aborted.compareAndSet(false, true))
         {
-            file.sstable.releaseReference();
+            for (OutgoingFileMessage file : files.values())
+            {
+                file.sstable.releaseReference();
+            }
+            timeoutExecutor.shutdownNow();
         }
-        timeoutExecutor.shutdownNow();
     }
 
     public int getTotalNumberOfFiles()


[3/4] git commit: Make sure we clear out repair sessions from netstats

Posted by ma...@apache.org.
Make sure we clear out repair sessions from netstats

Patch by JoshuaMcKenzie; reviewed by marcuse for CASSANDRA-7329


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/878990cd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/878990cd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/878990cd

Branch: refs/heads/cassandra-2.1
Commit: 878990cde853d2e43f1d1da107878ddafcef6262
Parents: d0c90e0
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 3 08:04:52 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 3 08:04:52 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                                     | 1 +
 src/java/org/apache/cassandra/streaming/StreamCoordinator.java  | 5 +++++
 src/java/org/apache/cassandra/streaming/StreamResultFuture.java | 2 +-
 3 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/878990cd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 188326d..333606a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 2.1.0
  * Upgrade to Pig 0.12.1 (CASSANDRA-6556)
+ * Make sure we clear out repair sessions from netstats (CASSANDRA-7329)
 Merged from 2.0:
  * Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323)
  * Make StreamSession#closeSession() idempotent (CASSANDRA-7262)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/878990cd/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index ed94f89..4a6b193 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -73,6 +73,11 @@ public class StreamCoordinator
         return results;
     }
 
+    public boolean isReceiving()
+    {
+        return connectionsPerHost == 0;
+    }
+
     public void connectAllStreamSessions()
     {
         for (HostStreamingData data : peerSessions.values())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/878990cd/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index cc7abbb..6a143d1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -65,7 +65,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
         this.coordinator = coordinator;
 
         // if there is no session to listen to, we immediately set result for returning
-        if (!coordinator.hasActiveSessions())
+        if (!coordinator.isReceiving() && !coordinator.hasActiveSessions())
             set(getCurrentState());
     }
 


[2/4] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by ma...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d0c90e07
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d0c90e07
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d0c90e07

Branch: refs/heads/cassandra-2.1
Commit: d0c90e07ab3fea2aff80e73fa3bc58bef756ad68
Parents: 371507a 58bb974
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 3 08:01:50 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 3 08:01:50 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                          |  1 +
 .../org/apache/cassandra/streaming/StreamReader.java | 15 +++++++++++++--
 2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0c90e07/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c7c6877,37105f3..188326d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,9 -1,19 +1,10 @@@
 -2.0.9
 +2.1.0
 + * Upgrade to Pig 0.12.1 (CASSANDRA-6556)
 +Merged from 2.0:
   * Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323)
 - * Add conditional CREATE/DROP USER support (CASSANDRA-7264)
 - * Swap local and global default read repair chances (CASSANDRA-7320)
 - * Add missing iso8601 patterns for date strings (CASSANDRA-6973)
 - * Support selecting multiple rows in a partition using IN (CASSANDRA-6875)
 - * cqlsh: always emphasize the partition key in DESC output (CASSANDRA-7274)
 - * Copy compaction options to make sure they are reloaded (CASSANDRA-7290)
 - * Add option to do more aggressive tombstone compactions (CASSANDRA-6563)
 - * Don't try to compact already-compacting files in HHOM (CASSANDRA-7288)
 - * Add authentication support to shuffle (CASSANDRA-6484)
 - * Cqlsh counts non-empty lines for "Blank lines" warning (CASSANDRA-7325)
   * Make StreamSession#closeSession() idempotent (CASSANDRA-7262)
+  * Fix infinite loop on exception while streaming (CASSANDRA-7330)
  Merged from 1.2:
 - * Fix availability validation for LOCAL_ONE CL (CASSANDRA-7319)
   * Use LOCAL_ONE for non-superuser auth queries (CASSANDRA-7328)
  
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0c90e07/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------