You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/02/28 22:33:44 UTC

[1/6] git commit: Fix NPE on BulkLoader caused by losing StreamEvent

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 fd53628cb -> b3a9a4434
  refs/heads/cassandra-2.1 bbad16b7e -> dcca99684
  refs/heads/trunk e449450b8 -> f6dff616f


Fix NPE on BulkLoader caused by losing StreamEvent

patch by yukim; reviewed by sankalp kohli for CASSANDRA-6636


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b3a9a443
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b3a9a443
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b3a9a443

Branch: refs/heads/cassandra-2.0
Commit: b3a9a443433a271fee33bede60d4892e0c8ffb03
Parents: fd53628
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Feb 28 15:28:25 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Feb 28 15:28:25 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                              |  1 +
 .../org/apache/cassandra/io/sstable/SSTableLoader.java   |  7 +++----
 src/java/org/apache/cassandra/streaming/StreamPlan.java  | 11 ++++++++++-
 .../apache/cassandra/streaming/StreamResultFuture.java   |  7 ++++++-
 src/java/org/apache/cassandra/tools/BulkLoader.java      |  7 ++++---
 5 files changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d6c9ae6..3e73f91 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,7 @@
  * Optimize single partition batch statements (CASSANDRA-6737)
  * Disallow post-query re-ordering when paging (CASSANDRA-6722)
  * Fix potential paging bug with deleted columns (CASSANDRA-6748)
+ * Fix NPE on BulkLoader caused by losing StreamEvent (CASSANDRA-6636)
 Merged from 1.2:
  * Add CMSClassUnloadingEnabled JVM option (CASSANDRA-6541)
  * Catch memtable flush exceptions during shutdown (CASSANDRA-6735)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index f867317..1ea4c55 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -144,7 +144,7 @@ public class SSTableLoader implements StreamEventHandler
         return stream(Collections.<InetAddress>emptySet());
     }
 
-    public StreamResultFuture stream(Set<InetAddress> toIgnore)
+    public StreamResultFuture stream(Set<InetAddress> toIgnore, StreamEventHandler... listeners)
     {
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
@@ -175,9 +175,8 @@ public class SSTableLoader implements StreamEventHandler
 
             plan.transferFiles(remote, streamingDetails.get(remote));
         }
-        StreamResultFuture bulkResult = plan.execute();
-        bulkResult.addEventListener(this);
-        return bulkResult;
+        plan.listeners(this, listeners);
+        return plan.execute();
     }
 
     public void onSuccess(StreamState finalState) {}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 288929c..740ad66 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -33,6 +33,7 @@ public class StreamPlan
 {
     private final UUID planId = UUIDGen.getTimeUUID();
     private final String description;
+    private final List<StreamEventHandler> handlers = new ArrayList<>();
 
     // sessions per InetAddress of the other end.
     private final Map<InetAddress, StreamSession> sessions = new HashMap<>();
@@ -121,6 +122,14 @@ public class StreamPlan
         return this;
     }
 
+    public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers)
+    {
+        this.handlers.add(handler);
+        if (handlers != null)
+            Collections.addAll(this.handlers, handlers);
+        return this;
+    }
+
     /**
      * @return true if this plan has no plan to execute
      */
@@ -136,7 +145,7 @@ public class StreamPlan
      */
     public StreamResultFuture execute()
     {
-        return StreamResultFuture.init(planId, description, sessions.values());
+        return StreamResultFuture.init(planId, description, sessions.values(), handlers);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index ccd3c92..dcffaff 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -75,9 +75,14 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             set(getCurrentState());
     }
 
-    static StreamResultFuture init(UUID planId, String description, Collection<StreamSession> sessions)
+    static StreamResultFuture init(UUID planId, String description, Collection<StreamSession> sessions, Collection<StreamEventHandler> listeners)
     {
         StreamResultFuture future = createAndRegister(planId, description, sessions);
+        if (listeners != null)
+        {
+            for (StreamEventHandler listener : listeners)
+                future.addEventListener(listener);
+        }
 
         logger.info("[Stream #{}] Executing streaming plan for {}", planId,  description);
         // start sessions

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 6c157e2..37ec635 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -79,7 +79,10 @@ public class BulkLoader
         StreamResultFuture future = null;
         try
         {
-            future = loader.stream(options.ignores);
+            if (options.noProgress)
+                future = loader.stream(options.ignores);
+            else
+                future = loader.stream(options.ignores, new ProgressIndicator());
         }
         catch (Exception e)
         {
@@ -94,8 +97,6 @@ public class BulkLoader
         }
 
         handler.output(String.format("Streaming session ID: %s", future.planId));
-        if (!options.noProgress)
-            future.addEventListener(new ProgressIndicator());
 
         try
         {


[3/6] git commit: Fix NPE on BulkLoader caused by losing StreamEvent

Posted by yu...@apache.org.
Fix NPE on BulkLoader caused by losing StreamEvent

patch by yukim; reviewed by sankalp kohli for CASSANDRA-6636


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b3a9a443
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b3a9a443
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b3a9a443

Branch: refs/heads/trunk
Commit: b3a9a443433a271fee33bede60d4892e0c8ffb03
Parents: fd53628
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Feb 28 15:28:25 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Feb 28 15:28:25 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                              |  1 +
 .../org/apache/cassandra/io/sstable/SSTableLoader.java   |  7 +++----
 src/java/org/apache/cassandra/streaming/StreamPlan.java  | 11 ++++++++++-
 .../apache/cassandra/streaming/StreamResultFuture.java   |  7 ++++++-
 src/java/org/apache/cassandra/tools/BulkLoader.java      |  7 ++++---
 5 files changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d6c9ae6..3e73f91 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,7 @@
  * Optimize single partition batch statements (CASSANDRA-6737)
  * Disallow post-query re-ordering when paging (CASSANDRA-6722)
  * Fix potential paging bug with deleted columns (CASSANDRA-6748)
+ * Fix NPE on BulkLoader caused by losing StreamEvent (CASSANDRA-6636)
 Merged from 1.2:
  * Add CMSClassUnloadingEnabled JVM option (CASSANDRA-6541)
  * Catch memtable flush exceptions during shutdown (CASSANDRA-6735)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index f867317..1ea4c55 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -144,7 +144,7 @@ public class SSTableLoader implements StreamEventHandler
         return stream(Collections.<InetAddress>emptySet());
     }
 
-    public StreamResultFuture stream(Set<InetAddress> toIgnore)
+    public StreamResultFuture stream(Set<InetAddress> toIgnore, StreamEventHandler... listeners)
     {
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
@@ -175,9 +175,8 @@ public class SSTableLoader implements StreamEventHandler
 
             plan.transferFiles(remote, streamingDetails.get(remote));
         }
-        StreamResultFuture bulkResult = plan.execute();
-        bulkResult.addEventListener(this);
-        return bulkResult;
+        plan.listeners(this, listeners);
+        return plan.execute();
     }
 
     public void onSuccess(StreamState finalState) {}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 288929c..740ad66 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -33,6 +33,7 @@ public class StreamPlan
 {
     private final UUID planId = UUIDGen.getTimeUUID();
     private final String description;
+    private final List<StreamEventHandler> handlers = new ArrayList<>();
 
     // sessions per InetAddress of the other end.
     private final Map<InetAddress, StreamSession> sessions = new HashMap<>();
@@ -121,6 +122,14 @@ public class StreamPlan
         return this;
     }
 
+    public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers)
+    {
+        this.handlers.add(handler);
+        if (handlers != null)
+            Collections.addAll(this.handlers, handlers);
+        return this;
+    }
+
     /**
      * @return true if this plan has no plan to execute
      */
@@ -136,7 +145,7 @@ public class StreamPlan
      */
     public StreamResultFuture execute()
     {
-        return StreamResultFuture.init(planId, description, sessions.values());
+        return StreamResultFuture.init(planId, description, sessions.values(), handlers);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index ccd3c92..dcffaff 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -75,9 +75,14 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             set(getCurrentState());
     }
 
-    static StreamResultFuture init(UUID planId, String description, Collection<StreamSession> sessions)
+    static StreamResultFuture init(UUID planId, String description, Collection<StreamSession> sessions, Collection<StreamEventHandler> listeners)
     {
         StreamResultFuture future = createAndRegister(planId, description, sessions);
+        if (listeners != null)
+        {
+            for (StreamEventHandler listener : listeners)
+                future.addEventListener(listener);
+        }
 
         logger.info("[Stream #{}] Executing streaming plan for {}", planId,  description);
         // start sessions

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 6c157e2..37ec635 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -79,7 +79,10 @@ public class BulkLoader
         StreamResultFuture future = null;
         try
         {
-            future = loader.stream(options.ignores);
+            if (options.noProgress)
+                future = loader.stream(options.ignores);
+            else
+                future = loader.stream(options.ignores, new ProgressIndicator());
         }
         catch (Exception e)
         {
@@ -94,8 +97,6 @@ public class BulkLoader
         }
 
         handler.output(String.format("Streaming session ID: %s", future.planId));
-        if (!options.noProgress)
-            future.addEventListener(new ProgressIndicator());
 
         try
         {


[6/6] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f6dff616
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f6dff616
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f6dff616

Branch: refs/heads/trunk
Commit: f6dff616fed3c12dd307424bd0ccbf979c25d053
Parents: e449450 dcca996
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Feb 28 15:30:46 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Feb 28 15:30:46 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                              |  1 +
 .../org/apache/cassandra/io/sstable/SSTableLoader.java   |  7 +++----
 src/java/org/apache/cassandra/streaming/StreamPlan.java  | 11 ++++++++++-
 .../apache/cassandra/streaming/StreamResultFuture.java   |  7 ++++++-
 src/java/org/apache/cassandra/tools/BulkLoader.java      |  7 ++++---
 5 files changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6dff616/CHANGES.txt
----------------------------------------------------------------------


[4/6] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by yu...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dcca9968
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dcca9968
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dcca9968

Branch: refs/heads/trunk
Commit: dcca99684f7225a94253d94220d2fb8b0e7c5e4e
Parents: bbad16b b3a9a44
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Feb 28 15:30:22 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Feb 28 15:30:22 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                              |  1 +
 .../org/apache/cassandra/io/sstable/SSTableLoader.java   |  7 +++----
 src/java/org/apache/cassandra/streaming/StreamPlan.java  | 11 ++++++++++-
 .../apache/cassandra/streaming/StreamResultFuture.java   |  7 ++++++-
 src/java/org/apache/cassandra/tools/BulkLoader.java      |  7 ++++---
 5 files changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcca9968/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1829683,3e73f91..20ae300
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -35,47 -27,24 +35,48 @@@ Merged from 2.0
   * Optimize single partition batch statements (CASSANDRA-6737)
   * Disallow post-query re-ordering when paging (CASSANDRA-6722)
   * Fix potential paging bug with deleted columns (CASSANDRA-6748)
+  * Fix NPE on BulkLoader caused by losing StreamEvent (CASSANDRA-6636)
 -Merged from 1.2:
   * Add CMSClassUnloadingEnabled JVM option (CASSANDRA-6541)
   * Catch memtable flush exceptions during shutdown (CASSANDRA-6735)
 - * Fix broken streams when replacing with same IP (CASSANDRA-6622)
   * Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645)
 - * Fix partition and range deletes not triggering flush (CASSANDRA-6655)
 - * Fix mean cells and mean row size per sstable calculations (CASSANDRA-6667)
 - * Compact hints after partial replay to clean out tombstones (CASSANDRA-6666)
 - * Log USING TTL/TIMESTAMP in a counter update warning (CASSANDRA-6649)
 - * Don't exchange schema between nodes with different versions (CASSANDRA-6695)
 - * Use real node messaging versions for schema exchange decisions (CASSANDRA-6700)
 - * IN on the last clustering columns + ORDER BY DESC yield no results (CASSANDRA-6701)
 - * Fix SecondaryIndexManager#deleteFromIndexes() (CASSANDRA-6711)
 - * Fix snapshot repair not snapshotting coordinator itself (CASSANDRA-6713)
 - * Support negative timestamps for CQL3 dates in query string (CASSANDRA-6718)
 - * Avoid NPEs when receiving table changes for an unknown keyspace (CASSANDRA-5631)
 - * Fix bootstrapping when there is no schema (CASSANDRA-6685)
 +
 +
 +2.1.0-beta1
 + * Add flush directory distinct from compaction directories (CASSANDRA-6357)
 + * Require JNA by default (CASSANDRA-6575)
 + * add listsnapshots command to nodetool (CASSANDRA-5742)
 + * Introduce AtomicBTreeColumns (CASSANDRA-6271, 6692)
 + * Multithreaded commitlog (CASSANDRA-3578)
 + * allocate fixed index summary memory pool and resample cold index summaries 
 +   to use less memory (CASSANDRA-5519)
 + * Removed multithreaded compaction (CASSANDRA-6142)
 + * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
 + * change logging from log4j to logback (CASSANDRA-5883)
 + * switch to LZ4 compression for internode communication (CASSANDRA-5887)
 + * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
 + * Remove 1.2 network compatibility code (CASSANDRA-5960)
 + * Remove leveled json manifest migration code (CASSANDRA-5996)
 + * Remove CFDefinition (CASSANDRA-6253)
 + * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
 + * User-defined types for CQL3 (CASSANDRA-5590)
 + * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
 + * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
 + * Secondary index support for collections (CASSANDRA-4511, 6383)
 + * SSTable metadata(Stats.db) format change (CASSANDRA-6356)
 + * Push composites support in the storage engine
 +   (CASSANDRA-5417, CASSANDRA-6520)
 + * Add snapshot space used to cfstats (CASSANDRA-6231)
 + * Add cardinality estimator for key count estimation (CASSANDRA-5906)
 + * CF id is changed to be non-deterministic. Data dir/key cache are created
 +   uniquely for CF id (CASSANDRA-5202)
 + * New counters implementation (CASSANDRA-6504)
 + * Replace UnsortedColumns, EmptyColumns, TreeMapBackedSortedColumns with new
 +   ArrayBackedSortedColumns (CASSANDRA-6630, CASSANDRA-6662, CASSANDRA-6690)
 + * Add option to use row cache with a given amount of rows (CASSANDRA-5357)
 + * Avoid repairing already repaired data (CASSANDRA-5351)
 + * Reject counter updates with USING TTL/TIMESTAMP (CASSANDRA-6649)
 + * Replace index_interval with min/max_index_interval (CASSANDRA-6379)
 + * Lift limitation that order by columns must be selected for IN queries (CASSANDRA-4911)
  
  
  2.0.5

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcca9968/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcca9968/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcca9968/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------


[5/6] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by yu...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dcca9968
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dcca9968
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dcca9968

Branch: refs/heads/cassandra-2.1
Commit: dcca99684f7225a94253d94220d2fb8b0e7c5e4e
Parents: bbad16b b3a9a44
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Feb 28 15:30:22 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Feb 28 15:30:22 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                              |  1 +
 .../org/apache/cassandra/io/sstable/SSTableLoader.java   |  7 +++----
 src/java/org/apache/cassandra/streaming/StreamPlan.java  | 11 ++++++++++-
 .../apache/cassandra/streaming/StreamResultFuture.java   |  7 ++++++-
 src/java/org/apache/cassandra/tools/BulkLoader.java      |  7 ++++---
 5 files changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcca9968/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1829683,3e73f91..20ae300
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -35,47 -27,24 +35,48 @@@ Merged from 2.0
   * Optimize single partition batch statements (CASSANDRA-6737)
   * Disallow post-query re-ordering when paging (CASSANDRA-6722)
   * Fix potential paging bug with deleted columns (CASSANDRA-6748)
+  * Fix NPE on BulkLoader caused by losing StreamEvent (CASSANDRA-6636)
 -Merged from 1.2:
   * Add CMSClassUnloadingEnabled JVM option (CASSANDRA-6541)
   * Catch memtable flush exceptions during shutdown (CASSANDRA-6735)
 - * Fix broken streams when replacing with same IP (CASSANDRA-6622)
   * Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645)
 - * Fix partition and range deletes not triggering flush (CASSANDRA-6655)
 - * Fix mean cells and mean row size per sstable calculations (CASSANDRA-6667)
 - * Compact hints after partial replay to clean out tombstones (CASSANDRA-6666)
 - * Log USING TTL/TIMESTAMP in a counter update warning (CASSANDRA-6649)
 - * Don't exchange schema between nodes with different versions (CASSANDRA-6695)
 - * Use real node messaging versions for schema exchange decisions (CASSANDRA-6700)
 - * IN on the last clustering columns + ORDER BY DESC yield no results (CASSANDRA-6701)
 - * Fix SecondaryIndexManager#deleteFromIndexes() (CASSANDRA-6711)
 - * Fix snapshot repair not snapshotting coordinator itself (CASSANDRA-6713)
 - * Support negative timestamps for CQL3 dates in query string (CASSANDRA-6718)
 - * Avoid NPEs when receiving table changes for an unknown keyspace (CASSANDRA-5631)
 - * Fix bootstrapping when there is no schema (CASSANDRA-6685)
 +
 +
 +2.1.0-beta1
 + * Add flush directory distinct from compaction directories (CASSANDRA-6357)
 + * Require JNA by default (CASSANDRA-6575)
 + * add listsnapshots command to nodetool (CASSANDRA-5742)
 + * Introduce AtomicBTreeColumns (CASSANDRA-6271, 6692)
 + * Multithreaded commitlog (CASSANDRA-3578)
 + * allocate fixed index summary memory pool and resample cold index summaries 
 +   to use less memory (CASSANDRA-5519)
 + * Removed multithreaded compaction (CASSANDRA-6142)
 + * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
 + * change logging from log4j to logback (CASSANDRA-5883)
 + * switch to LZ4 compression for internode communication (CASSANDRA-5887)
 + * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
 + * Remove 1.2 network compatibility code (CASSANDRA-5960)
 + * Remove leveled json manifest migration code (CASSANDRA-5996)
 + * Remove CFDefinition (CASSANDRA-6253)
 + * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
 + * User-defined types for CQL3 (CASSANDRA-5590)
 + * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
 + * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
 + * Secondary index support for collections (CASSANDRA-4511, 6383)
 + * SSTable metadata(Stats.db) format change (CASSANDRA-6356)
 + * Push composites support in the storage engine
 +   (CASSANDRA-5417, CASSANDRA-6520)
 + * Add snapshot space used to cfstats (CASSANDRA-6231)
 + * Add cardinality estimator for key count estimation (CASSANDRA-5906)
 + * CF id is changed to be non-deterministic. Data dir/key cache are created
 +   uniquely for CF id (CASSANDRA-5202)
 + * New counters implementation (CASSANDRA-6504)
 + * Replace UnsortedColumns, EmptyColumns, TreeMapBackedSortedColumns with new
 +   ArrayBackedSortedColumns (CASSANDRA-6630, CASSANDRA-6662, CASSANDRA-6690)
 + * Add option to use row cache with a given amount of rows (CASSANDRA-5357)
 + * Avoid repairing already repaired data (CASSANDRA-5351)
 + * Reject counter updates with USING TTL/TIMESTAMP (CASSANDRA-6649)
 + * Replace index_interval with min/max_index_interval (CASSANDRA-6379)
 + * Lift limitation that order by columns must be selected for IN queries (CASSANDRA-4911)
  
  
  2.0.5

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcca9968/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcca9968/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcca9968/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------


[2/6] git commit: Fix NPE on BulkLoader caused by losing StreamEvent

Posted by yu...@apache.org.
Fix NPE on BulkLoader caused by losing StreamEvent

patch by yukim; reviewed by sankalp kohli for CASSANDRA-6636


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b3a9a443
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b3a9a443
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b3a9a443

Branch: refs/heads/cassandra-2.1
Commit: b3a9a443433a271fee33bede60d4892e0c8ffb03
Parents: fd53628
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Feb 28 15:28:25 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Feb 28 15:28:25 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                              |  1 +
 .../org/apache/cassandra/io/sstable/SSTableLoader.java   |  7 +++----
 src/java/org/apache/cassandra/streaming/StreamPlan.java  | 11 ++++++++++-
 .../apache/cassandra/streaming/StreamResultFuture.java   |  7 ++++++-
 src/java/org/apache/cassandra/tools/BulkLoader.java      |  7 ++++---
 5 files changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d6c9ae6..3e73f91 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,7 @@
  * Optimize single partition batch statements (CASSANDRA-6737)
  * Disallow post-query re-ordering when paging (CASSANDRA-6722)
  * Fix potential paging bug with deleted columns (CASSANDRA-6748)
+ * Fix NPE on BulkLoader caused by losing StreamEvent (CASSANDRA-6636)
 Merged from 1.2:
  * Add CMSClassUnloadingEnabled JVM option (CASSANDRA-6541)
  * Catch memtable flush exceptions during shutdown (CASSANDRA-6735)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index f867317..1ea4c55 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -144,7 +144,7 @@ public class SSTableLoader implements StreamEventHandler
         return stream(Collections.<InetAddress>emptySet());
     }
 
-    public StreamResultFuture stream(Set<InetAddress> toIgnore)
+    public StreamResultFuture stream(Set<InetAddress> toIgnore, StreamEventHandler... listeners)
     {
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
@@ -175,9 +175,8 @@ public class SSTableLoader implements StreamEventHandler
 
             plan.transferFiles(remote, streamingDetails.get(remote));
         }
-        StreamResultFuture bulkResult = plan.execute();
-        bulkResult.addEventListener(this);
-        return bulkResult;
+        plan.listeners(this, listeners);
+        return plan.execute();
     }
 
     public void onSuccess(StreamState finalState) {}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 288929c..740ad66 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -33,6 +33,7 @@ public class StreamPlan
 {
     private final UUID planId = UUIDGen.getTimeUUID();
     private final String description;
+    private final List<StreamEventHandler> handlers = new ArrayList<>();
 
     // sessions per InetAddress of the other end.
     private final Map<InetAddress, StreamSession> sessions = new HashMap<>();
@@ -121,6 +122,14 @@ public class StreamPlan
         return this;
     }
 
+    public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers)
+    {
+        this.handlers.add(handler);
+        if (handlers != null)
+            Collections.addAll(this.handlers, handlers);
+        return this;
+    }
+
     /**
      * @return true if this plan has no plan to execute
      */
@@ -136,7 +145,7 @@ public class StreamPlan
      */
     public StreamResultFuture execute()
     {
-        return StreamResultFuture.init(planId, description, sessions.values());
+        return StreamResultFuture.init(planId, description, sessions.values(), handlers);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index ccd3c92..dcffaff 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -75,9 +75,14 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             set(getCurrentState());
     }
 
-    static StreamResultFuture init(UUID planId, String description, Collection<StreamSession> sessions)
+    static StreamResultFuture init(UUID planId, String description, Collection<StreamSession> sessions, Collection<StreamEventHandler> listeners)
     {
         StreamResultFuture future = createAndRegister(planId, description, sessions);
+        if (listeners != null)
+        {
+            for (StreamEventHandler listener : listeners)
+                future.addEventListener(listener);
+        }
 
         logger.info("[Stream #{}] Executing streaming plan for {}", planId,  description);
         // start sessions

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 6c157e2..37ec635 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -79,7 +79,10 @@ public class BulkLoader
         StreamResultFuture future = null;
         try
         {
-            future = loader.stream(options.ignores);
+            if (options.noProgress)
+                future = loader.stream(options.ignores);
+            else
+                future = loader.stream(options.ignores, new ProgressIndicator());
         }
         catch (Exception e)
         {
@@ -94,8 +97,6 @@ public class BulkLoader
         }
 
         handler.output(String.format("Streaming session ID: %s", future.planId));
-        if (!options.noProgress)
-            future.addEventListener(new ProgressIndicator());
 
         try
         {