You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/12/01 20:08:26 UTC

[03/15] cassandra git commit: Add proper error handling to stream receiver

Add proper error handling to stream receiver

patch by Paulo Motta; reviewed by yukim for CASSANDRA-10774


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ba69a32
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ba69a32
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ba69a32

Branch: refs/heads/trunk
Commit: 5ba69a32590074610f5516a20b8198416b79dfcf
Parents: 7650fc1
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Nov 27 16:37:37 2015 -0800
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 11:53:35 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 105 ++++++++++---------
 2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a2f7b6e..3ce2da6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
  * Warn or fail when changing cluster topology live (CASSANDRA-10243)
  * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
  * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 738c93c..8773cab 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
 
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -116,63 +117,73 @@ public class StreamReceiveTask extends StreamTask
 
         public void run()
         {
-            Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
-            if (kscf == null)
+            try
             {
-                // schema was dropped during streaming
+                Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+                if (kscf == null)
+                {
+                    // schema was dropped during streaming
+                    for (SSTableWriter writer : task.sstables)
+                        writer.abort();
+                    task.sstables.clear();
+                    task.session.taskCompleted(task);
+                    return;
+                }
+                ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+                File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+                if (lockfiledir == null)
+                    throw new IOError(new IOException("All disks full"));
+                StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+                lockfile.create(task.sstables);
+                List<SSTableReader> readers = new ArrayList<>();
                 for (SSTableWriter writer : task.sstables)
-                    writer.abort();
+                    readers.add(writer.closeAndOpenReader());
+                lockfile.delete();
                 task.sstables.clear();
-                return;
-            }
-            ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
-            File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
-            if (lockfiledir == null)
-                throw new IOError(new IOException("All disks full"));
-            StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
-            lockfile.create(task.sstables);
-            List<SSTableReader> readers = new ArrayList<>();
-            for (SSTableWriter writer : task.sstables)
-                readers.add(writer.closeAndOpenReader());
-            lockfile.delete();
-            task.sstables.clear();
-
-            try (Refs<SSTableReader> refs = Refs.ref(readers))
-            {
-                // add sstables and build secondary indexes
-                cfs.addSSTables(readers);
-                cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 
-                //invalidate row and counter cache
-                if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+                try (Refs<SSTableReader> refs = Refs.ref(readers))
                 {
-                    List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
-                    for (SSTableReader sstable : readers)
-                        boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
-                    Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+                    // add sstables and build secondary indexes
+                    cfs.addSSTables(readers);
+                    cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 
-                    if (cfs.isRowCacheEnabled())
+                    //invalidate row and counter cache
+                    if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                     {
-                        int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
-                        if (invalidatedKeys > 0)
-                            logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
-                                         "receive task completed.", task.session.planId(), invalidatedKeys,
-                                         cfs.keyspace.getName(), cfs.getColumnFamilyName());
-                    }
-
-                    if (cfs.metadata.isCounter())
-                    {
-                        int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
-                        if (invalidatedKeys > 0)
-                            logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
-                                        "receive task completed.", task.session.planId(), invalidatedKeys,
-                                        cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+                        for (SSTableReader sstable : readers)
+                            boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+                        Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+                        if (cfs.isRowCacheEnabled())
+                        {
+                            int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+                            if (invalidatedKeys > 0)
+                                logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+                                             "receive task completed.", task.session.planId(), invalidatedKeys,
+                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        }
+
+                        if (cfs.metadata.isCounter())
+                        {
+                            int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+                            if (invalidatedKeys > 0)
+                                logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+                                             "receive task completed.", task.session.planId(), invalidatedKeys,
+                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        }
                     }
                 }
-            }
 
-            task.session.taskCompleted(task);
+                task.session.taskCompleted(task);
+            }
+            catch (Throwable t)
+            {
+                logger.error("Error applying streamed data: ", t);
+                JVMStabilityInspector.inspectThrowable(t);
+                task.session.onError(t);
+            }
         }
     }