You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/12/01 20:08:24 UTC

[01/15] cassandra git commit: Add proper error handling to stream receiver

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 7650fc196 -> 5ba69a325
  refs/heads/cassandra-2.2 1b81ad19d -> 2491ede35
  refs/heads/cassandra-3.0 803a3d901 -> ccb20ad46
  refs/heads/cassandra-3.1 6bda8868c -> 5b6a368c9
  refs/heads/trunk 5daf8d020 -> 03863ed24


Add proper error handling to stream receiver

patch by Paulo Motta; reviewed by yukim for CASSANDRA-10774


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ba69a32
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ba69a32
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ba69a32

Branch: refs/heads/cassandra-2.1
Commit: 5ba69a32590074610f5516a20b8198416b79dfcf
Parents: 7650fc1
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Nov 27 16:37:37 2015 -0800
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 11:53:35 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 105 ++++++++++---------
 2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a2f7b6e..3ce2da6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
  * Warn or fail when changing cluster topology live (CASSANDRA-10243)
  * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
  * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 738c93c..8773cab 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
 
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -116,63 +117,73 @@ public class StreamReceiveTask extends StreamTask
 
         public void run()
         {
-            Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
-            if (kscf == null)
+            try
             {
-                // schema was dropped during streaming
+                Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+                if (kscf == null)
+                {
+                    // schema was dropped during streaming
+                    for (SSTableWriter writer : task.sstables)
+                        writer.abort();
+                    task.sstables.clear();
+                    task.session.taskCompleted(task);
+                    return;
+                }
+                ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+                File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+                if (lockfiledir == null)
+                    throw new IOError(new IOException("All disks full"));
+                StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+                lockfile.create(task.sstables);
+                List<SSTableReader> readers = new ArrayList<>();
                 for (SSTableWriter writer : task.sstables)
-                    writer.abort();
+                    readers.add(writer.closeAndOpenReader());
+                lockfile.delete();
                 task.sstables.clear();
-                return;
-            }
-            ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
-            File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
-            if (lockfiledir == null)
-                throw new IOError(new IOException("All disks full"));
-            StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
-            lockfile.create(task.sstables);
-            List<SSTableReader> readers = new ArrayList<>();
-            for (SSTableWriter writer : task.sstables)
-                readers.add(writer.closeAndOpenReader());
-            lockfile.delete();
-            task.sstables.clear();
-
-            try (Refs<SSTableReader> refs = Refs.ref(readers))
-            {
-                // add sstables and build secondary indexes
-                cfs.addSSTables(readers);
-                cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 
-                //invalidate row and counter cache
-                if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+                try (Refs<SSTableReader> refs = Refs.ref(readers))
                 {
-                    List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
-                    for (SSTableReader sstable : readers)
-                        boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
-                    Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+                    // add sstables and build secondary indexes
+                    cfs.addSSTables(readers);
+                    cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 
-                    if (cfs.isRowCacheEnabled())
+                    //invalidate row and counter cache
+                    if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                     {
-                        int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
-                        if (invalidatedKeys > 0)
-                            logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
-                                         "receive task completed.", task.session.planId(), invalidatedKeys,
-                                         cfs.keyspace.getName(), cfs.getColumnFamilyName());
-                    }
-
-                    if (cfs.metadata.isCounter())
-                    {
-                        int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
-                        if (invalidatedKeys > 0)
-                            logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
-                                        "receive task completed.", task.session.planId(), invalidatedKeys,
-                                        cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+                        for (SSTableReader sstable : readers)
+                            boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+                        Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+                        if (cfs.isRowCacheEnabled())
+                        {
+                            int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+                            if (invalidatedKeys > 0)
+                                logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+                                             "receive task completed.", task.session.planId(), invalidatedKeys,
+                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        }
+
+                        if (cfs.metadata.isCounter())
+                        {
+                            int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+                            if (invalidatedKeys > 0)
+                                logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+                                             "receive task completed.", task.session.planId(), invalidatedKeys,
+                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        }
                     }
                 }
-            }
 
-            task.session.taskCompleted(task);
+                task.session.taskCompleted(task);
+            }
+            catch (Throwable t)
+            {
+                logger.error("Error applying streamed data: ", t);
+                JVMStabilityInspector.inspectThrowable(t);
+                task.session.onError(t);
+            }
         }
     }
 


[11/15] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ccb20ad4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ccb20ad4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ccb20ad4

Branch: refs/heads/cassandra-3.1
Commit: ccb20ad46ab38961aac39cc8634f450046bdf16b
Parents: 803a3d9 2491ede
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:07:39 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:07:39 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 60 ++++++++++----------
 2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb20ad4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7fffbbf,7541212..a01011b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -23,7 -8,16 +23,8 @@@ Merged from 2.2
   * Fix SimpleDateType type compatibility (CASSANDRA-10027)
   * (Hadoop) fix splits calculation (CASSANDRA-10640)
   * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 - * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 - * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 - * Expose phi values from failure detector via JMX and tweak debug
 -   and trace logging (CASSANDRA-9526)
 - * Fix RangeNamesQueryPager (CASSANDRA-10509)
 - * Deprecate Pig support (CASSANDRA-10542)
 - * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
  Merged from 2.1:
+  * Add proper error handling to stream receiver (CASSANDRA-10774)
   * Warn or fail when changing cluster topology live (CASSANDRA-10243)
   * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
   * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb20ad4/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 54ce711,dd56b8b..dfc91f9
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -126,110 -113,73 +126,110 @@@ public class StreamReceiveTask extends 
  
          public void run()
          {
-             Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
-             if (kscf == null)
-             {
-                 // schema was dropped during streaming
-                 task.sstables.forEach(SSTableMultiWriter::abortOrDie);
- 
-                 task.sstables.clear();
-                 task.txn.abort();
-                 return;
-             }
-             ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-             boolean hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
- 
++            boolean hasViews = false;
++            ColumnFamilyStore cfs = null;
              try
              {
+                 Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+                 if (kscf == null)
+                 {
+                     // schema was dropped during streaming
 -                    for (SSTableWriter writer : task.sstables)
 -                        writer.abort();
++                    task.sstables.forEach(SSTableMultiWriter::abortOrDie);
+                     task.sstables.clear();
++                    task.txn.abort();
+                     task.session.taskCompleted(task);
+                     return;
+                 }
 -                ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
++                cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
++                hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
+ 
 -                File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
 -                if (lockfiledir == null)
 -                    throw new IOError(new IOException("All disks full"));
 -                StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
 -                lockfile.create(task.sstables);
                  List<SSTableReader> readers = new ArrayList<>();
 -                for (SSTableWriter writer : task.sstables)
 -                    readers.add(writer.finish(true));
 -                lockfile.delete();
 +                for (SSTableMultiWriter writer : task.sstables)
 +                {
 +                    Collection<SSTableReader> newReaders = writer.finish(true);
 +                    readers.addAll(newReaders);
 +                    task.txn.update(newReaders, false);
 +                }
 +
                  task.sstables.clear();
  
                  try (Refs<SSTableReader> refs = Refs.ref(readers))
                  {
 -                    // add sstables and build secondary indexes
 -                    cfs.addSSTables(readers);
 -                    cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 -
 -                    //invalidate row and counter cache
 -                    if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
 +                    //We have a special path for views.
 +                    //Since the view requires cleaning up any pre-existing state, we must put
 +                    //all partitions through the same write path as normal mutations.
 +                    //This also ensures any 2is are also updated
 +                    if (hasViews)
                      {
 -                        List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
 -                        for (SSTableReader sstable : readers)
 -                            boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
 -                        Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
 -
 -                        if (cfs.isRowCacheEnabled())
 +                        for (SSTableReader reader : readers)
                          {
 -                            int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
 -                            if (invalidatedKeys > 0)
 -                                logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
 -                                             "receive task completed.", task.session.planId(), invalidatedKeys,
 -                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
 +                            try (ISSTableScanner scanner = reader.getScanner())
 +                            {
 +                                while (scanner.hasNext())
 +                                {
 +                                    try (UnfilteredRowIterator rowIterator = scanner.next())
 +                                    {
 +                                        //Apply unsafe (we will flush below before transaction is done)
 +                                        new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
 +                                    }
 +                                }
 +                            }
                          }
 +                    }
 +                    else
 +                    {
 +                        task.txn.finish();
  
 -                        if (cfs.metadata.isCounter())
 +                        // add sstables and build secondary indexes
 +                        cfs.addSSTables(readers);
 +                        cfs.indexManager.buildAllIndexesBlocking(readers);
 +
 +                        //invalidate row and counter cache
 +                        if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                          {
 -                            int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
 -                            if (invalidatedKeys > 0)
 -                                logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
 -                                             "receive task completed.", task.session.planId(), invalidatedKeys,
 -                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
 +                            List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
 +                            readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
 +                            Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
 +
 +                            if (cfs.isRowCacheEnabled())
 +                            {
 +                                int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
 +                                if (invalidatedKeys > 0)
 +                                    logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
 +                                                 "receive task completed.", task.session.planId(), invalidatedKeys,
 +                                                 cfs.keyspace.getName(), cfs.getTableName());
 +                            }
 +
 +                            if (cfs.metadata.isCounter())
 +                            {
 +                                int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
 +                                if (invalidatedKeys > 0)
 +                                    logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
 +                                                 "receive task completed.", task.session.planId(), invalidatedKeys,
 +                                                 cfs.keyspace.getName(), cfs.getTableName());
 +                            }
                          }
-                     }
-                 }
-                 catch (Throwable t)
-                 {
-                     logger.error("Error applying streamed sstable: ", t);
- 
-                     JVMStabilityInspector.inspectThrowable(t);
-                 }
-                 finally
-                 {
-                     //We don't keep the streamed sstables since we've applied them manually
-                     //So we abort the txn and delete the streamed sstables
-                     if (hasViews)
-                     {
-                         cfs.forceBlockingFlush();
-                         task.txn.abort();
++                        task.session.taskCompleted(task);
                      }
                  }
 -
 -                task.session.taskCompleted(task);
              }
+             catch (Throwable t)
+             {
+                 logger.error("Error applying streamed data: ", t);
+                 JVMStabilityInspector.inspectThrowable(t);
+                 task.session.onError(t);
+             }
 +            finally
 +            {
-                 task.session.taskCompleted(task);
++                //We don't keep the streamed sstables since we've applied them manually
++                //So we abort the txn and delete the streamed sstables
++                if (hasViews)
++                {
++                    if (cfs != null)
++                        cfs.forceBlockingFlush();
++                    task.txn.abort();
++                }
 +            }
          }
      }
  


[06/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2491ede3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2491ede3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2491ede3

Branch: refs/heads/trunk
Commit: 2491ede3515f4b774069ffd645b0fb18f9c73630
Parents: 1b81ad1 5ba69a3
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:05:36 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:05:36 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 105 ++++++++++---------
 2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index af1a186,3ce2da6..7541212
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,22 -1,5 +1,23 @@@
 -2.1.12
 +2.2.4
 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
 + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
 + * Reject index queries while the index is building (CASSANDRA-8505)
 + * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
 + * Fix JSON update with prepared statements (CASSANDRA-10631)
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Add proper error handling to stream receiver (CASSANDRA-10774)
   * Warn or fail when changing cluster topology live (CASSANDRA-10243)
   * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
   * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 846524b,8773cab..dd56b8b
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -37,8 -37,10 +37,9 @@@ import org.apache.cassandra.db.ColumnFa
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.dht.Bounds;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 -import org.apache.cassandra.io.sstable.SSTableWriter;
 -import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.Pair;
  
  import org.apache.cassandra.utils.concurrent.Refs;
@@@ -112,63 -117,73 +113,73 @@@ public class StreamReceiveTask extends 
  
          public void run()
          {
-             Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
-             if (kscf == null)
+             try
              {
-                 // schema was dropped during streaming
+                 Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+                 if (kscf == null)
+                 {
+                     // schema was dropped during streaming
+                     for (SSTableWriter writer : task.sstables)
+                         writer.abort();
+                     task.sstables.clear();
+                     task.session.taskCompleted(task);
+                     return;
+                 }
+                 ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ 
+                 File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+                 if (lockfiledir == null)
+                     throw new IOError(new IOException("All disks full"));
+                 StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+                 lockfile.create(task.sstables);
+                 List<SSTableReader> readers = new ArrayList<>();
                  for (SSTableWriter writer : task.sstables)
-                     writer.abort();
 -                    readers.add(writer.closeAndOpenReader());
++                    readers.add(writer.finish(true));
+                 lockfile.delete();
                  task.sstables.clear();
-                 return;
-             }
-             ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- 
-             File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
-             if (lockfiledir == null)
-                 throw new IOError(new IOException("All disks full"));
-             StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
-             lockfile.create(task.sstables);
-             List<SSTableReader> readers = new ArrayList<>();
-             for (SSTableWriter writer : task.sstables)
-                 readers.add(writer.finish(true));
-             lockfile.delete();
-             task.sstables.clear();
- 
-             try (Refs<SSTableReader> refs = Refs.ref(readers))
-             {
-                 // add sstables and build secondary indexes
-                 cfs.addSSTables(readers);
-                 cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
  
-                 //invalidate row and counter cache
-                 if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+                 try (Refs<SSTableReader> refs = Refs.ref(readers))
                  {
-                     List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
-                     for (SSTableReader sstable : readers)
-                         boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
-                     Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+                     // add sstables and build secondary indexes
+                     cfs.addSSTables(readers);
+                     cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
  
-                     if (cfs.isRowCacheEnabled())
+                     //invalidate row and counter cache
+                     if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                      {
-                         int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
-                         if (invalidatedKeys > 0)
-                             logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
-                                          "receive task completed.", task.session.planId(), invalidatedKeys,
-                                          cfs.keyspace.getName(), cfs.getColumnFamilyName());
-                     }
- 
-                     if (cfs.metadata.isCounter())
-                     {
-                         int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
-                         if (invalidatedKeys > 0)
-                             logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
-                                          "receive task completed.", task.session.planId(), invalidatedKeys,
-                                          cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                         List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+                         for (SSTableReader sstable : readers)
+                             boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+                         Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+ 
+                         if (cfs.isRowCacheEnabled())
+                         {
+                             int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+                             if (invalidatedKeys > 0)
+                                 logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+                                              "receive task completed.", task.session.planId(), invalidatedKeys,
+                                              cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                         }
+ 
+                         if (cfs.metadata.isCounter())
+                         {
+                             int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+                             if (invalidatedKeys > 0)
+                                 logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+                                              "receive task completed.", task.session.planId(), invalidatedKeys,
+                                              cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                         }
                      }
                  }
-             }
  
-             task.session.taskCompleted(task);
+                 task.session.taskCompleted(task);
+             }
+             catch (Throwable t)
+             {
+                 logger.error("Error applying streamed data: ", t);
+                 JVMStabilityInspector.inspectThrowable(t);
+                 task.session.onError(t);
+             }
          }
      }
  


[08/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2491ede3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2491ede3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2491ede3

Branch: refs/heads/cassandra-3.0
Commit: 2491ede3515f4b774069ffd645b0fb18f9c73630
Parents: 1b81ad1 5ba69a3
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:05:36 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:05:36 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 105 ++++++++++---------
 2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index af1a186,3ce2da6..7541212
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,22 -1,5 +1,23 @@@
 -2.1.12
 +2.2.4
 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
 + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
 + * Reject index queries while the index is building (CASSANDRA-8505)
 + * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
 + * Fix JSON update with prepared statements (CASSANDRA-10631)
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Add proper error handling to stream receiver (CASSANDRA-10774)
   * Warn or fail when changing cluster topology live (CASSANDRA-10243)
   * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
   * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 846524b,8773cab..dd56b8b
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -37,8 -37,10 +37,9 @@@ import org.apache.cassandra.db.ColumnFa
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.dht.Bounds;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 -import org.apache.cassandra.io.sstable.SSTableWriter;
 -import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.Pair;
  
  import org.apache.cassandra.utils.concurrent.Refs;
@@@ -112,63 -117,73 +113,73 @@@ public class StreamReceiveTask extends 
  
          public void run()
          {
-             Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
-             if (kscf == null)
+             try
              {
-                 // schema was dropped during streaming
+                 Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+                 if (kscf == null)
+                 {
+                     // schema was dropped during streaming
+                     for (SSTableWriter writer : task.sstables)
+                         writer.abort();
+                     task.sstables.clear();
+                     task.session.taskCompleted(task);
+                     return;
+                 }
+                 ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ 
+                 File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+                 if (lockfiledir == null)
+                     throw new IOError(new IOException("All disks full"));
+                 StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+                 lockfile.create(task.sstables);
+                 List<SSTableReader> readers = new ArrayList<>();
                  for (SSTableWriter writer : task.sstables)
-                     writer.abort();
 -                    readers.add(writer.closeAndOpenReader());
++                    readers.add(writer.finish(true));
+                 lockfile.delete();
                  task.sstables.clear();
-                 return;
-             }
-             ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- 
-             File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
-             if (lockfiledir == null)
-                 throw new IOError(new IOException("All disks full"));
-             StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
-             lockfile.create(task.sstables);
-             List<SSTableReader> readers = new ArrayList<>();
-             for (SSTableWriter writer : task.sstables)
-                 readers.add(writer.finish(true));
-             lockfile.delete();
-             task.sstables.clear();
- 
-             try (Refs<SSTableReader> refs = Refs.ref(readers))
-             {
-                 // add sstables and build secondary indexes
-                 cfs.addSSTables(readers);
-                 cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
  
-                 //invalidate row and counter cache
-                 if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+                 try (Refs<SSTableReader> refs = Refs.ref(readers))
                  {
-                     List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
-                     for (SSTableReader sstable : readers)
-                         boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
-                     Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+                     // add sstables and build secondary indexes
+                     cfs.addSSTables(readers);
+                     cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
  
-                     if (cfs.isRowCacheEnabled())
+                     //invalidate row and counter cache
+                     if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                      {
-                         int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
-                         if (invalidatedKeys > 0)
-                             logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
-                                          "receive task completed.", task.session.planId(), invalidatedKeys,
-                                          cfs.keyspace.getName(), cfs.getColumnFamilyName());
-                     }
- 
-                     if (cfs.metadata.isCounter())
-                     {
-                         int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
-                         if (invalidatedKeys > 0)
-                             logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
-                                          "receive task completed.", task.session.planId(), invalidatedKeys,
-                                          cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                         List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+                         for (SSTableReader sstable : readers)
+                             boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+                         Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+ 
+                         if (cfs.isRowCacheEnabled())
+                         {
+                             int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+                             if (invalidatedKeys > 0)
+                                 logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+                                              "receive task completed.", task.session.planId(), invalidatedKeys,
+                                              cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                         }
+ 
+                         if (cfs.metadata.isCounter())
+                         {
+                             int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+                             if (invalidatedKeys > 0)
+                                 logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+                                              "receive task completed.", task.session.planId(), invalidatedKeys,
+                                              cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                         }
                      }
                  }
-             }
  
-             task.session.taskCompleted(task);
+                 task.session.taskCompleted(task);
+             }
+             catch (Throwable t)
+             {
+                 logger.error("Error applying streamed data: ", t);
+                 JVMStabilityInspector.inspectThrowable(t);
+                 task.session.onError(t);
+             }
          }
      }
  


[05/15] cassandra git commit: Add proper error handling to stream receiver

Posted by yu...@apache.org.
Add proper error handling to stream receiver

patch by Paulo Motta; reviewed by yukim for CASSANDRA-10774


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ba69a32
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ba69a32
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ba69a32

Branch: refs/heads/cassandra-3.1
Commit: 5ba69a32590074610f5516a20b8198416b79dfcf
Parents: 7650fc1
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Nov 27 16:37:37 2015 -0800
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 11:53:35 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 105 ++++++++++---------
 2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a2f7b6e..3ce2da6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
  * Warn or fail when changing cluster topology live (CASSANDRA-10243)
  * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
  * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 738c93c..8773cab 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
 
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -116,63 +117,73 @@ public class StreamReceiveTask extends StreamTask
 
         public void run()
         {
-            Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
-            if (kscf == null)
+            try
             {
-                // schema was dropped during streaming
+                Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+                if (kscf == null)
+                {
+                    // schema was dropped during streaming
+                    for (SSTableWriter writer : task.sstables)
+                        writer.abort();
+                    task.sstables.clear();
+                    task.session.taskCompleted(task);
+                    return;
+                }
+                ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+                File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+                if (lockfiledir == null)
+                    throw new IOError(new IOException("All disks full"));
+                StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+                lockfile.create(task.sstables);
+                List<SSTableReader> readers = new ArrayList<>();
                 for (SSTableWriter writer : task.sstables)
-                    writer.abort();
+                    readers.add(writer.closeAndOpenReader());
+                lockfile.delete();
                 task.sstables.clear();
-                return;
-            }
-            ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
-            File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
-            if (lockfiledir == null)
-                throw new IOError(new IOException("All disks full"));
-            StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
-            lockfile.create(task.sstables);
-            List<SSTableReader> readers = new ArrayList<>();
-            for (SSTableWriter writer : task.sstables)
-                readers.add(writer.closeAndOpenReader());
-            lockfile.delete();
-            task.sstables.clear();
-
-            try (Refs<SSTableReader> refs = Refs.ref(readers))
-            {
-                // add sstables and build secondary indexes
-                cfs.addSSTables(readers);
-                cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 
-                //invalidate row and counter cache
-                if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+                try (Refs<SSTableReader> refs = Refs.ref(readers))
                 {
-                    List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
-                    for (SSTableReader sstable : readers)
-                        boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
-                    Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+                    // add sstables and build secondary indexes
+                    cfs.addSSTables(readers);
+                    cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 
-                    if (cfs.isRowCacheEnabled())
+                    //invalidate row and counter cache
+                    if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                     {
-                        int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
-                        if (invalidatedKeys > 0)
-                            logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
-                                         "receive task completed.", task.session.planId(), invalidatedKeys,
-                                         cfs.keyspace.getName(), cfs.getColumnFamilyName());
-                    }
-
-                    if (cfs.metadata.isCounter())
-                    {
-                        int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
-                        if (invalidatedKeys > 0)
-                            logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
-                                        "receive task completed.", task.session.planId(), invalidatedKeys,
-                                        cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+                        for (SSTableReader sstable : readers)
+                            boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+                        Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+                        if (cfs.isRowCacheEnabled())
+                        {
+                            int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+                            if (invalidatedKeys > 0)
+                                logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+                                             "receive task completed.", task.session.planId(), invalidatedKeys,
+                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        }
+
+                        if (cfs.metadata.isCounter())
+                        {
+                            int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+                            if (invalidatedKeys > 0)
+                                logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+                                             "receive task completed.", task.session.planId(), invalidatedKeys,
+                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        }
                     }
                 }
-            }
 
-            task.session.taskCompleted(task);
+                task.session.taskCompleted(task);
+            }
+            catch (Throwable t)
+            {
+                logger.error("Error applying streamed data: ", t);
+                JVMStabilityInspector.inspectThrowable(t);
+                task.session.onError(t);
+            }
         }
     }
 


[15/15] cassandra git commit: Merge branch 'cassandra-3.1' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-3.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03863ed2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03863ed2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03863ed2

Branch: refs/heads/trunk
Commit: 03863ed2459c1c98361b500f0f24066b3c3bcc3f
Parents: 5daf8d0 5b6a368
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:08:01 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:08:01 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 60 ++++++++++----------
 2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/03863ed2/CHANGES.txt
----------------------------------------------------------------------


[02/15] cassandra git commit: Add proper error handling to stream receiver

Posted by yu...@apache.org.
Add proper error handling to stream receiver

patch by Paulo Motta; reviewed by yukim for CASSANDRA-10774


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ba69a32
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ba69a32
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ba69a32

Branch: refs/heads/cassandra-2.2
Commit: 5ba69a32590074610f5516a20b8198416b79dfcf
Parents: 7650fc1
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Nov 27 16:37:37 2015 -0800
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 11:53:35 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 105 ++++++++++---------
 2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a2f7b6e..3ce2da6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
  * Warn or fail when changing cluster topology live (CASSANDRA-10243)
  * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
  * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 738c93c..8773cab 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
 
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -116,63 +117,73 @@ public class StreamReceiveTask extends StreamTask
 
         public void run()
         {
-            Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
-            if (kscf == null)
+            try
             {
-                // schema was dropped during streaming
+                Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+                if (kscf == null)
+                {
+                    // schema was dropped during streaming
+                    for (SSTableWriter writer : task.sstables)
+                        writer.abort();
+                    task.sstables.clear();
+                    task.session.taskCompleted(task);
+                    return;
+                }
+                ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+                File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+                if (lockfiledir == null)
+                    throw new IOError(new IOException("All disks full"));
+                StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+                lockfile.create(task.sstables);
+                List<SSTableReader> readers = new ArrayList<>();
                 for (SSTableWriter writer : task.sstables)
-                    writer.abort();
+                    readers.add(writer.closeAndOpenReader());
+                lockfile.delete();
                 task.sstables.clear();
-                return;
-            }
-            ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
-            File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
-            if (lockfiledir == null)
-                throw new IOError(new IOException("All disks full"));
-            StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
-            lockfile.create(task.sstables);
-            List<SSTableReader> readers = new ArrayList<>();
-            for (SSTableWriter writer : task.sstables)
-                readers.add(writer.closeAndOpenReader());
-            lockfile.delete();
-            task.sstables.clear();
-
-            try (Refs<SSTableReader> refs = Refs.ref(readers))
-            {
-                // add sstables and build secondary indexes
-                cfs.addSSTables(readers);
-                cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 
-                //invalidate row and counter cache
-                if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+                try (Refs<SSTableReader> refs = Refs.ref(readers))
                 {
-                    List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
-                    for (SSTableReader sstable : readers)
-                        boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
-                    Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+                    // add sstables and build secondary indexes
+                    cfs.addSSTables(readers);
+                    cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 
-                    if (cfs.isRowCacheEnabled())
+                    //invalidate row and counter cache
+                    if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                     {
-                        int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
-                        if (invalidatedKeys > 0)
-                            logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
-                                         "receive task completed.", task.session.planId(), invalidatedKeys,
-                                         cfs.keyspace.getName(), cfs.getColumnFamilyName());
-                    }
-
-                    if (cfs.metadata.isCounter())
-                    {
-                        int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
-                        if (invalidatedKeys > 0)
-                            logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
-                                        "receive task completed.", task.session.planId(), invalidatedKeys,
-                                        cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+                        for (SSTableReader sstable : readers)
+                            boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+                        Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+                        if (cfs.isRowCacheEnabled())
+                        {
+                            int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+                            if (invalidatedKeys > 0)
+                                logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+                                             "receive task completed.", task.session.planId(), invalidatedKeys,
+                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        }
+
+                        if (cfs.metadata.isCounter())
+                        {
+                            int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+                            if (invalidatedKeys > 0)
+                                logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+                                             "receive task completed.", task.session.planId(), invalidatedKeys,
+                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        }
                     }
                 }
-            }
 
-            task.session.taskCompleted(task);
+                task.session.taskCompleted(task);
+            }
+            catch (Throwable t)
+            {
+                logger.error("Error applying streamed data: ", t);
+                JVMStabilityInspector.inspectThrowable(t);
+                task.session.onError(t);
+            }
         }
     }
 


[10/15] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ccb20ad4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ccb20ad4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ccb20ad4

Branch: refs/heads/trunk
Commit: ccb20ad46ab38961aac39cc8634f450046bdf16b
Parents: 803a3d9 2491ede
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:07:39 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:07:39 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 60 ++++++++++----------
 2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb20ad4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7fffbbf,7541212..a01011b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -23,7 -8,16 +23,8 @@@ Merged from 2.2
   * Fix SimpleDateType type compatibility (CASSANDRA-10027)
   * (Hadoop) fix splits calculation (CASSANDRA-10640)
   * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 - * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 - * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 - * Expose phi values from failure detector via JMX and tweak debug
 -   and trace logging (CASSANDRA-9526)
 - * Fix RangeNamesQueryPager (CASSANDRA-10509)
 - * Deprecate Pig support (CASSANDRA-10542)
 - * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
  Merged from 2.1:
+  * Add proper error handling to stream receiver (CASSANDRA-10774)
   * Warn or fail when changing cluster topology live (CASSANDRA-10243)
   * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
   * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb20ad4/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 54ce711,dd56b8b..dfc91f9
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -126,110 -113,73 +126,110 @@@ public class StreamReceiveTask extends 
  
          public void run()
          {
-             Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
-             if (kscf == null)
-             {
-                 // schema was dropped during streaming
-                 task.sstables.forEach(SSTableMultiWriter::abortOrDie);
- 
-                 task.sstables.clear();
-                 task.txn.abort();
-                 return;
-             }
-             ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-             boolean hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
- 
++            boolean hasViews = false;
++            ColumnFamilyStore cfs = null;
              try
              {
+                 Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+                 if (kscf == null)
+                 {
+                     // schema was dropped during streaming
 -                    for (SSTableWriter writer : task.sstables)
 -                        writer.abort();
++                    task.sstables.forEach(SSTableMultiWriter::abortOrDie);
+                     task.sstables.clear();
++                    task.txn.abort();
+                     task.session.taskCompleted(task);
+                     return;
+                 }
 -                ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
++                cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
++                hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
+ 
 -                File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
 -                if (lockfiledir == null)
 -                    throw new IOError(new IOException("All disks full"));
 -                StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
 -                lockfile.create(task.sstables);
                  List<SSTableReader> readers = new ArrayList<>();
 -                for (SSTableWriter writer : task.sstables)
 -                    readers.add(writer.finish(true));
 -                lockfile.delete();
 +                for (SSTableMultiWriter writer : task.sstables)
 +                {
 +                    Collection<SSTableReader> newReaders = writer.finish(true);
 +                    readers.addAll(newReaders);
 +                    task.txn.update(newReaders, false);
 +                }
 +
                  task.sstables.clear();
  
                  try (Refs<SSTableReader> refs = Refs.ref(readers))
                  {
 -                    // add sstables and build secondary indexes
 -                    cfs.addSSTables(readers);
 -                    cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 -
 -                    //invalidate row and counter cache
 -                    if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
 +                    //We have a special path for views.
 +                    //Since the view requires cleaning up any pre-existing state, we must put
 +                    //all partitions through the same write path as normal mutations.
 +                    //This also ensures any 2is are also updated
 +                    if (hasViews)
                      {
 -                        List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
 -                        for (SSTableReader sstable : readers)
 -                            boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
 -                        Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
 -
 -                        if (cfs.isRowCacheEnabled())
 +                        for (SSTableReader reader : readers)
                          {
 -                            int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
 -                            if (invalidatedKeys > 0)
 -                                logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
 -                                             "receive task completed.", task.session.planId(), invalidatedKeys,
 -                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
 +                            try (ISSTableScanner scanner = reader.getScanner())
 +                            {
 +                                while (scanner.hasNext())
 +                                {
 +                                    try (UnfilteredRowIterator rowIterator = scanner.next())
 +                                    {
 +                                        //Apply unsafe (we will flush below before transaction is done)
 +                                        new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
 +                                    }
 +                                }
 +                            }
                          }
 +                    }
 +                    else
 +                    {
 +                        task.txn.finish();
  
 -                        if (cfs.metadata.isCounter())
 +                        // add sstables and build secondary indexes
 +                        cfs.addSSTables(readers);
 +                        cfs.indexManager.buildAllIndexesBlocking(readers);
 +
 +                        //invalidate row and counter cache
 +                        if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                          {
 -                            int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
 -                            if (invalidatedKeys > 0)
 -                                logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
 -                                             "receive task completed.", task.session.planId(), invalidatedKeys,
 -                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
 +                            List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
 +                            readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
 +                            Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
 +
 +                            if (cfs.isRowCacheEnabled())
 +                            {
 +                                int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
 +                                if (invalidatedKeys > 0)
 +                                    logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
 +                                                 "receive task completed.", task.session.planId(), invalidatedKeys,
 +                                                 cfs.keyspace.getName(), cfs.getTableName());
 +                            }
 +
 +                            if (cfs.metadata.isCounter())
 +                            {
 +                                int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
 +                                if (invalidatedKeys > 0)
 +                                    logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
 +                                                 "receive task completed.", task.session.planId(), invalidatedKeys,
 +                                                 cfs.keyspace.getName(), cfs.getTableName());
 +                            }
                          }
-                     }
-                 }
-                 catch (Throwable t)
-                 {
-                     logger.error("Error applying streamed sstable: ", t);
- 
-                     JVMStabilityInspector.inspectThrowable(t);
-                 }
-                 finally
-                 {
-                     //We don't keep the streamed sstables since we've applied them manually
-                     //So we abort the txn and delete the streamed sstables
-                     if (hasViews)
-                     {
-                         cfs.forceBlockingFlush();
-                         task.txn.abort();
++                        task.session.taskCompleted(task);
                      }
                  }
 -
 -                task.session.taskCompleted(task);
              }
+             catch (Throwable t)
+             {
+                 logger.error("Error applying streamed data: ", t);
+                 JVMStabilityInspector.inspectThrowable(t);
+                 task.session.onError(t);
+             }
 +            finally
 +            {
-                 task.session.taskCompleted(task);
++                //We don't keep the streamed sstables since we've applied them manually
++                //So we abort the txn and delete the streamed sstables
++                if (hasViews)
++                {
++                    if (cfs != null)
++                        cfs.forceBlockingFlush();
++                    task.txn.abort();
++                }
 +            }
          }
      }
  


[13/15] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.1

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b6a368c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b6a368c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b6a368c

Branch: refs/heads/cassandra-3.1
Commit: 5b6a368c90233072c419e94bd3c3cb8c1362376e
Parents: 6bda886 ccb20ad
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:07:51 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:07:51 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 60 ++++++++++----------
 2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b6a368c/CHANGES.txt
----------------------------------------------------------------------


[12/15] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ccb20ad4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ccb20ad4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ccb20ad4

Branch: refs/heads/cassandra-3.0
Commit: ccb20ad46ab38961aac39cc8634f450046bdf16b
Parents: 803a3d9 2491ede
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:07:39 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:07:39 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 60 ++++++++++----------
 2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb20ad4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7fffbbf,7541212..a01011b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -23,7 -8,16 +23,8 @@@ Merged from 2.2
   * Fix SimpleDateType type compatibility (CASSANDRA-10027)
   * (Hadoop) fix splits calculation (CASSANDRA-10640)
   * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 - * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 - * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 - * Expose phi values from failure detector via JMX and tweak debug
 -   and trace logging (CASSANDRA-9526)
 - * Fix RangeNamesQueryPager (CASSANDRA-10509)
 - * Deprecate Pig support (CASSANDRA-10542)
 - * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
  Merged from 2.1:
+  * Add proper error handling to stream receiver (CASSANDRA-10774)
   * Warn or fail when changing cluster topology live (CASSANDRA-10243)
   * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
   * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccb20ad4/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 54ce711,dd56b8b..dfc91f9
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -126,110 -113,73 +126,110 @@@ public class StreamReceiveTask extends 
  
          public void run()
          {
-             Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
-             if (kscf == null)
-             {
-                 // schema was dropped during streaming
-                 task.sstables.forEach(SSTableMultiWriter::abortOrDie);
- 
-                 task.sstables.clear();
-                 task.txn.abort();
-                 return;
-             }
-             ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-             boolean hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
- 
++            boolean hasViews = false;
++            ColumnFamilyStore cfs = null;
              try
              {
+                 Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+                 if (kscf == null)
+                 {
+                     // schema was dropped during streaming
 -                    for (SSTableWriter writer : task.sstables)
 -                        writer.abort();
++                    task.sstables.forEach(SSTableMultiWriter::abortOrDie);
+                     task.sstables.clear();
++                    task.txn.abort();
+                     task.session.taskCompleted(task);
+                     return;
+                 }
 -                ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
++                cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
++                hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
+ 
 -                File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
 -                if (lockfiledir == null)
 -                    throw new IOError(new IOException("All disks full"));
 -                StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
 -                lockfile.create(task.sstables);
                  List<SSTableReader> readers = new ArrayList<>();
 -                for (SSTableWriter writer : task.sstables)
 -                    readers.add(writer.finish(true));
 -                lockfile.delete();
 +                for (SSTableMultiWriter writer : task.sstables)
 +                {
 +                    Collection<SSTableReader> newReaders = writer.finish(true);
 +                    readers.addAll(newReaders);
 +                    task.txn.update(newReaders, false);
 +                }
 +
                  task.sstables.clear();
  
                  try (Refs<SSTableReader> refs = Refs.ref(readers))
                  {
 -                    // add sstables and build secondary indexes
 -                    cfs.addSSTables(readers);
 -                    cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 -
 -                    //invalidate row and counter cache
 -                    if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
 +                    //We have a special path for views.
 +                    //Since the view requires cleaning up any pre-existing state, we must put
 +                    //all partitions through the same write path as normal mutations.
 +                    //This also ensures any 2is are also updated
 +                    if (hasViews)
                      {
 -                        List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
 -                        for (SSTableReader sstable : readers)
 -                            boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
 -                        Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
 -
 -                        if (cfs.isRowCacheEnabled())
 +                        for (SSTableReader reader : readers)
                          {
 -                            int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
 -                            if (invalidatedKeys > 0)
 -                                logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
 -                                             "receive task completed.", task.session.planId(), invalidatedKeys,
 -                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
 +                            try (ISSTableScanner scanner = reader.getScanner())
 +                            {
 +                                while (scanner.hasNext())
 +                                {
 +                                    try (UnfilteredRowIterator rowIterator = scanner.next())
 +                                    {
 +                                        //Apply unsafe (we will flush below before transaction is done)
 +                                        new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
 +                                    }
 +                                }
 +                            }
                          }
 +                    }
 +                    else
 +                    {
 +                        task.txn.finish();
  
 -                        if (cfs.metadata.isCounter())
 +                        // add sstables and build secondary indexes
 +                        cfs.addSSTables(readers);
 +                        cfs.indexManager.buildAllIndexesBlocking(readers);
 +
 +                        //invalidate row and counter cache
 +                        if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                          {
 -                            int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
 -                            if (invalidatedKeys > 0)
 -                                logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
 -                                             "receive task completed.", task.session.planId(), invalidatedKeys,
 -                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
 +                            List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
 +                            readers.forEach(sstable -> boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
 +                            Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
 +
 +                            if (cfs.isRowCacheEnabled())
 +                            {
 +                                int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
 +                                if (invalidatedKeys > 0)
 +                                    logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
 +                                                 "receive task completed.", task.session.planId(), invalidatedKeys,
 +                                                 cfs.keyspace.getName(), cfs.getTableName());
 +                            }
 +
 +                            if (cfs.metadata.isCounter())
 +                            {
 +                                int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
 +                                if (invalidatedKeys > 0)
 +                                    logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
 +                                                 "receive task completed.", task.session.planId(), invalidatedKeys,
 +                                                 cfs.keyspace.getName(), cfs.getTableName());
 +                            }
                          }
-                     }
-                 }
-                 catch (Throwable t)
-                 {
-                     logger.error("Error applying streamed sstable: ", t);
- 
-                     JVMStabilityInspector.inspectThrowable(t);
-                 }
-                 finally
-                 {
-                     //We don't keep the streamed sstables since we've applied them manually
-                     //So we abort the txn and delete the streamed sstables
-                     if (hasViews)
-                     {
-                         cfs.forceBlockingFlush();
-                         task.txn.abort();
++                        task.session.taskCompleted(task);
                      }
                  }
 -
 -                task.session.taskCompleted(task);
              }
+             catch (Throwable t)
+             {
+                 logger.error("Error applying streamed data: ", t);
+                 JVMStabilityInspector.inspectThrowable(t);
+                 task.session.onError(t);
+             }
 +            finally
 +            {
-                 task.session.taskCompleted(task);
++                //We don't keep the streamed sstables since we've applied them manually
++                //So we abort the txn and delete the streamed sstables
++                if (hasViews)
++                {
++                    if (cfs != null)
++                        cfs.forceBlockingFlush();
++                    task.txn.abort();
++                }
 +            }
          }
      }
  


[07/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2491ede3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2491ede3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2491ede3

Branch: refs/heads/cassandra-2.2
Commit: 2491ede3515f4b774069ffd645b0fb18f9c73630
Parents: 1b81ad1 5ba69a3
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:05:36 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:05:36 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 105 ++++++++++---------
 2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index af1a186,3ce2da6..7541212
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,22 -1,5 +1,23 @@@
 -2.1.12
 +2.2.4
 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
 + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
 + * Reject index queries while the index is building (CASSANDRA-8505)
 + * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
 + * Fix JSON update with prepared statements (CASSANDRA-10631)
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Add proper error handling to stream receiver (CASSANDRA-10774)
   * Warn or fail when changing cluster topology live (CASSANDRA-10243)
   * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
   * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 846524b,8773cab..dd56b8b
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -37,8 -37,10 +37,9 @@@ import org.apache.cassandra.db.ColumnFa
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.dht.Bounds;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 -import org.apache.cassandra.io.sstable.SSTableWriter;
 -import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.Pair;
  
  import org.apache.cassandra.utils.concurrent.Refs;
@@@ -112,63 -117,73 +113,73 @@@ public class StreamReceiveTask extends 
  
          public void run()
          {
-             Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
-             if (kscf == null)
+             try
              {
-                 // schema was dropped during streaming
+                 Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+                 if (kscf == null)
+                 {
+                     // schema was dropped during streaming
+                     for (SSTableWriter writer : task.sstables)
+                         writer.abort();
+                     task.sstables.clear();
+                     task.session.taskCompleted(task);
+                     return;
+                 }
+                 ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ 
+                 File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+                 if (lockfiledir == null)
+                     throw new IOError(new IOException("All disks full"));
+                 StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+                 lockfile.create(task.sstables);
+                 List<SSTableReader> readers = new ArrayList<>();
                  for (SSTableWriter writer : task.sstables)
-                     writer.abort();
 -                    readers.add(writer.closeAndOpenReader());
++                    readers.add(writer.finish(true));
+                 lockfile.delete();
                  task.sstables.clear();
-                 return;
-             }
-             ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- 
-             File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
-             if (lockfiledir == null)
-                 throw new IOError(new IOException("All disks full"));
-             StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
-             lockfile.create(task.sstables);
-             List<SSTableReader> readers = new ArrayList<>();
-             for (SSTableWriter writer : task.sstables)
-                 readers.add(writer.finish(true));
-             lockfile.delete();
-             task.sstables.clear();
- 
-             try (Refs<SSTableReader> refs = Refs.ref(readers))
-             {
-                 // add sstables and build secondary indexes
-                 cfs.addSSTables(readers);
-                 cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
  
-                 //invalidate row and counter cache
-                 if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+                 try (Refs<SSTableReader> refs = Refs.ref(readers))
                  {
-                     List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
-                     for (SSTableReader sstable : readers)
-                         boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
-                     Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+                     // add sstables and build secondary indexes
+                     cfs.addSSTables(readers);
+                     cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
  
-                     if (cfs.isRowCacheEnabled())
+                     //invalidate row and counter cache
+                     if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                      {
-                         int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
-                         if (invalidatedKeys > 0)
-                             logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
-                                          "receive task completed.", task.session.planId(), invalidatedKeys,
-                                          cfs.keyspace.getName(), cfs.getColumnFamilyName());
-                     }
- 
-                     if (cfs.metadata.isCounter())
-                     {
-                         int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
-                         if (invalidatedKeys > 0)
-                             logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
-                                          "receive task completed.", task.session.planId(), invalidatedKeys,
-                                          cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                         List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+                         for (SSTableReader sstable : readers)
+                             boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+                         Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+ 
+                         if (cfs.isRowCacheEnabled())
+                         {
+                             int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+                             if (invalidatedKeys > 0)
+                                 logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+                                              "receive task completed.", task.session.planId(), invalidatedKeys,
+                                              cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                         }
+ 
+                         if (cfs.metadata.isCounter())
+                         {
+                             int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+                             if (invalidatedKeys > 0)
+                                 logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+                                              "receive task completed.", task.session.planId(), invalidatedKeys,
+                                              cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                         }
                      }
                  }
-             }
  
-             task.session.taskCompleted(task);
+                 task.session.taskCompleted(task);
+             }
+             catch (Throwable t)
+             {
+                 logger.error("Error applying streamed data: ", t);
+                 JVMStabilityInspector.inspectThrowable(t);
+                 task.session.onError(t);
+             }
          }
      }
  


[03/15] cassandra git commit: Add proper error handling to stream receiver

Posted by yu...@apache.org.
Add proper error handling to stream receiver

patch by Paulo Motta; reviewed by yukim for CASSANDRA-10774


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ba69a32
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ba69a32
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ba69a32

Branch: refs/heads/trunk
Commit: 5ba69a32590074610f5516a20b8198416b79dfcf
Parents: 7650fc1
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Nov 27 16:37:37 2015 -0800
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 11:53:35 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 105 ++++++++++---------
 2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a2f7b6e..3ce2da6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
  * Warn or fail when changing cluster topology live (CASSANDRA-10243)
  * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
  * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 738c93c..8773cab 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
 
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -116,63 +117,73 @@ public class StreamReceiveTask extends StreamTask
 
         public void run()
         {
-            Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
-            if (kscf == null)
+            try
             {
-                // schema was dropped during streaming
+                Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+                if (kscf == null)
+                {
+                    // schema was dropped during streaming
+                    for (SSTableWriter writer : task.sstables)
+                        writer.abort();
+                    task.sstables.clear();
+                    task.session.taskCompleted(task);
+                    return;
+                }
+                ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+                File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+                if (lockfiledir == null)
+                    throw new IOError(new IOException("All disks full"));
+                StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+                lockfile.create(task.sstables);
+                List<SSTableReader> readers = new ArrayList<>();
                 for (SSTableWriter writer : task.sstables)
-                    writer.abort();
+                    readers.add(writer.closeAndOpenReader());
+                lockfile.delete();
                 task.sstables.clear();
-                return;
-            }
-            ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
-            File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
-            if (lockfiledir == null)
-                throw new IOError(new IOException("All disks full"));
-            StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
-            lockfile.create(task.sstables);
-            List<SSTableReader> readers = new ArrayList<>();
-            for (SSTableWriter writer : task.sstables)
-                readers.add(writer.closeAndOpenReader());
-            lockfile.delete();
-            task.sstables.clear();
-
-            try (Refs<SSTableReader> refs = Refs.ref(readers))
-            {
-                // add sstables and build secondary indexes
-                cfs.addSSTables(readers);
-                cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 
-                //invalidate row and counter cache
-                if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+                try (Refs<SSTableReader> refs = Refs.ref(readers))
                 {
-                    List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
-                    for (SSTableReader sstable : readers)
-                        boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
-                    Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+                    // add sstables and build secondary indexes
+                    cfs.addSSTables(readers);
+                    cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 
-                    if (cfs.isRowCacheEnabled())
+                    //invalidate row and counter cache
+                    if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                     {
-                        int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
-                        if (invalidatedKeys > 0)
-                            logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
-                                         "receive task completed.", task.session.planId(), invalidatedKeys,
-                                         cfs.keyspace.getName(), cfs.getColumnFamilyName());
-                    }
-
-                    if (cfs.metadata.isCounter())
-                    {
-                        int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
-                        if (invalidatedKeys > 0)
-                            logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
-                                        "receive task completed.", task.session.planId(), invalidatedKeys,
-                                        cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+                        for (SSTableReader sstable : readers)
+                            boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+                        Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+                        if (cfs.isRowCacheEnabled())
+                        {
+                            int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+                            if (invalidatedKeys > 0)
+                                logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+                                             "receive task completed.", task.session.planId(), invalidatedKeys,
+                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        }
+
+                        if (cfs.metadata.isCounter())
+                        {
+                            int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+                            if (invalidatedKeys > 0)
+                                logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+                                             "receive task completed.", task.session.planId(), invalidatedKeys,
+                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        }
                     }
                 }
-            }
 
-            task.session.taskCompleted(task);
+                task.session.taskCompleted(task);
+            }
+            catch (Throwable t)
+            {
+                logger.error("Error applying streamed data: ", t);
+                JVMStabilityInspector.inspectThrowable(t);
+                task.session.onError(t);
+            }
         }
     }
 


[04/15] cassandra git commit: Add proper error handling to stream receiver

Posted by yu...@apache.org.
Add proper error handling to stream receiver

patch by Paulo Motta; reviewed by yukim for CASSANDRA-10774


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ba69a32
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ba69a32
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ba69a32

Branch: refs/heads/cassandra-3.0
Commit: 5ba69a32590074610f5516a20b8198416b79dfcf
Parents: 7650fc1
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Nov 27 16:37:37 2015 -0800
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 11:53:35 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 105 ++++++++++---------
 2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a2f7b6e..3ce2da6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Add proper error handling to stream receiver (CASSANDRA-10774)
  * Warn or fail when changing cluster topology live (CASSANDRA-10243)
  * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
  * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ba69a32/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 738c93c..8773cab 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
 
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -116,63 +117,73 @@ public class StreamReceiveTask extends StreamTask
 
         public void run()
         {
-            Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
-            if (kscf == null)
+            try
             {
-                // schema was dropped during streaming
+                Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+                if (kscf == null)
+                {
+                    // schema was dropped during streaming
+                    for (SSTableWriter writer : task.sstables)
+                        writer.abort();
+                    task.sstables.clear();
+                    task.session.taskCompleted(task);
+                    return;
+                }
+                ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+                File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+                if (lockfiledir == null)
+                    throw new IOError(new IOException("All disks full"));
+                StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+                lockfile.create(task.sstables);
+                List<SSTableReader> readers = new ArrayList<>();
                 for (SSTableWriter writer : task.sstables)
-                    writer.abort();
+                    readers.add(writer.closeAndOpenReader());
+                lockfile.delete();
                 task.sstables.clear();
-                return;
-            }
-            ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-
-            File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
-            if (lockfiledir == null)
-                throw new IOError(new IOException("All disks full"));
-            StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
-            lockfile.create(task.sstables);
-            List<SSTableReader> readers = new ArrayList<>();
-            for (SSTableWriter writer : task.sstables)
-                readers.add(writer.closeAndOpenReader());
-            lockfile.delete();
-            task.sstables.clear();
-
-            try (Refs<SSTableReader> refs = Refs.ref(readers))
-            {
-                // add sstables and build secondary indexes
-                cfs.addSSTables(readers);
-                cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 
-                //invalidate row and counter cache
-                if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+                try (Refs<SSTableReader> refs = Refs.ref(readers))
                 {
-                    List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
-                    for (SSTableReader sstable : readers)
-                        boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
-                    Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+                    // add sstables and build secondary indexes
+                    cfs.addSSTables(readers);
+                    cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
 
-                    if (cfs.isRowCacheEnabled())
+                    //invalidate row and counter cache
+                    if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                     {
-                        int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
-                        if (invalidatedKeys > 0)
-                            logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
-                                         "receive task completed.", task.session.planId(), invalidatedKeys,
-                                         cfs.keyspace.getName(), cfs.getColumnFamilyName());
-                    }
-
-                    if (cfs.metadata.isCounter())
-                    {
-                        int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
-                        if (invalidatedKeys > 0)
-                            logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
-                                        "receive task completed.", task.session.planId(), invalidatedKeys,
-                                        cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+                        for (SSTableReader sstable : readers)
+                            boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+                        Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+                        if (cfs.isRowCacheEnabled())
+                        {
+                            int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+                            if (invalidatedKeys > 0)
+                                logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+                                             "receive task completed.", task.session.planId(), invalidatedKeys,
+                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        }
+
+                        if (cfs.metadata.isCounter())
+                        {
+                            int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+                            if (invalidatedKeys > 0)
+                                logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+                                             "receive task completed.", task.session.planId(), invalidatedKeys,
+                                             cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                        }
                     }
                 }
-            }
 
-            task.session.taskCompleted(task);
+                task.session.taskCompleted(task);
+            }
+            catch (Throwable t)
+            {
+                logger.error("Error applying streamed data: ", t);
+                JVMStabilityInspector.inspectThrowable(t);
+                task.session.onError(t);
+            }
         }
     }
 


[14/15] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.1

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b6a368c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b6a368c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b6a368c

Branch: refs/heads/trunk
Commit: 5b6a368c90233072c419e94bd3c3cb8c1362376e
Parents: 6bda886 ccb20ad
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:07:51 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:07:51 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 60 ++++++++++----------
 2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b6a368c/CHANGES.txt
----------------------------------------------------------------------


[09/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2491ede3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2491ede3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2491ede3

Branch: refs/heads/cassandra-3.1
Commit: 2491ede3515f4b774069ffd645b0fb18f9c73630
Parents: 1b81ad1 5ba69a3
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Dec 1 13:05:36 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Dec 1 13:05:36 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 105 ++++++++++---------
 2 files changed, 59 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index af1a186,3ce2da6..7541212
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,22 -1,5 +1,23 @@@
 -2.1.12
 +2.2.4
 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
 + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
 + * Reject index queries while the index is building (CASSANDRA-8505)
 + * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
 + * Fix JSON update with prepared statements (CASSANDRA-10631)
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Add proper error handling to stream receiver (CASSANDRA-10774)
   * Warn or fail when changing cluster topology live (CASSANDRA-10243)
   * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)
   * Some DROP ... IF EXISTS incorrectly result in exceptions on non-existing KS (CASSANDRA-10658)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2491ede3/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 846524b,8773cab..dd56b8b
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -37,8 -37,10 +37,9 @@@ import org.apache.cassandra.db.ColumnFa
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.dht.Bounds;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 -import org.apache.cassandra.io.sstable.SSTableWriter;
 -import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.Pair;
  
  import org.apache.cassandra.utils.concurrent.Refs;
@@@ -112,63 -117,73 +113,73 @@@ public class StreamReceiveTask extends 
  
          public void run()
          {
-             Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
-             if (kscf == null)
+             try
              {
-                 // schema was dropped during streaming
+                 Pair<String, String> kscf = Schema.instance.getCF(task.cfId);
+                 if (kscf == null)
+                 {
+                     // schema was dropped during streaming
+                     for (SSTableWriter writer : task.sstables)
+                         writer.abort();
+                     task.sstables.clear();
+                     task.session.taskCompleted(task);
+                     return;
+                 }
+                 ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ 
+                 File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
+                 if (lockfiledir == null)
+                     throw new IOError(new IOException("All disks full"));
+                 StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
+                 lockfile.create(task.sstables);
+                 List<SSTableReader> readers = new ArrayList<>();
                  for (SSTableWriter writer : task.sstables)
-                     writer.abort();
 -                    readers.add(writer.closeAndOpenReader());
++                    readers.add(writer.finish(true));
+                 lockfile.delete();
                  task.sstables.clear();
-                 return;
-             }
-             ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- 
-             File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256L);
-             if (lockfiledir == null)
-                 throw new IOError(new IOException("All disks full"));
-             StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
-             lockfile.create(task.sstables);
-             List<SSTableReader> readers = new ArrayList<>();
-             for (SSTableWriter writer : task.sstables)
-                 readers.add(writer.finish(true));
-             lockfile.delete();
-             task.sstables.clear();
- 
-             try (Refs<SSTableReader> refs = Refs.ref(readers))
-             {
-                 // add sstables and build secondary indexes
-                 cfs.addSSTables(readers);
-                 cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
  
-                 //invalidate row and counter cache
-                 if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+                 try (Refs<SSTableReader> refs = Refs.ref(readers))
                  {
-                     List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
-                     for (SSTableReader sstable : readers)
-                         boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
-                     Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+                     // add sstables and build secondary indexes
+                     cfs.addSSTables(readers);
+                     cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
  
-                     if (cfs.isRowCacheEnabled())
+                     //invalidate row and counter cache
+                     if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
                      {
-                         int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
-                         if (invalidatedKeys > 0)
-                             logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
-                                          "receive task completed.", task.session.planId(), invalidatedKeys,
-                                          cfs.keyspace.getName(), cfs.getColumnFamilyName());
-                     }
- 
-                     if (cfs.metadata.isCounter())
-                     {
-                         int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
-                         if (invalidatedKeys > 0)
-                             logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
-                                          "receive task completed.", task.session.planId(), invalidatedKeys,
-                                          cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                         List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+                         for (SSTableReader sstable : readers)
+                             boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+                         Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+ 
+                         if (cfs.isRowCacheEnabled())
+                         {
+                             int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+                             if (invalidatedKeys > 0)
+                                 logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+                                              "receive task completed.", task.session.planId(), invalidatedKeys,
+                                              cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                         }
+ 
+                         if (cfs.metadata.isCounter())
+                         {
+                             int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+                             if (invalidatedKeys > 0)
+                                 logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+                                              "receive task completed.", task.session.planId(), invalidatedKeys,
+                                              cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                         }
                      }
                  }
-             }
  
-             task.session.taskCompleted(task);
+                 task.session.taskCompleted(task);
+             }
+             catch (Throwable t)
+             {
+                 logger.error("Error applying streamed data: ", t);
+                 JVMStabilityInspector.inspectThrowable(t);
+                 task.session.onError(t);
+             }
          }
      }