You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/02/28 22:33:45 UTC
[2/6] git commit: Fix NPE on BulkLoader caused by losing StreamEvent
Fix NPE on BulkLoader caused by losing StreamEvent
patch by yukim; reviewed by sankalp kohli for CASSANDRA-6636
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b3a9a443
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b3a9a443
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b3a9a443
Branch: refs/heads/cassandra-2.1
Commit: b3a9a443433a271fee33bede60d4892e0c8ffb03
Parents: fd53628
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Feb 28 15:28:25 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Feb 28 15:28:25 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/io/sstable/SSTableLoader.java | 7 +++----
src/java/org/apache/cassandra/streaming/StreamPlan.java | 11 ++++++++++-
.../apache/cassandra/streaming/StreamResultFuture.java | 7 ++++++-
src/java/org/apache/cassandra/tools/BulkLoader.java | 7 ++++---
5 files changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d6c9ae6..3e73f91 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,7 @@
* Optimize single partition batch statements (CASSANDRA-6737)
* Disallow post-query re-ordering when paging (CASSANDRA-6722)
* Fix potential paging bug with deleted columns (CASSANDRA-6748)
+ * Fix NPE on BulkLoader caused by losing StreamEvent (CASSANDRA-6636)
Merged from 1.2:
* Add CMSClassUnloadingEnabled JVM option (CASSANDRA-6541)
* Catch memtable flush exceptions during shutdown (CASSANDRA-6735)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index f867317..1ea4c55 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -144,7 +144,7 @@ public class SSTableLoader implements StreamEventHandler
return stream(Collections.<InetAddress>emptySet());
}
- public StreamResultFuture stream(Set<InetAddress> toIgnore)
+ public StreamResultFuture stream(Set<InetAddress> toIgnore, StreamEventHandler... listeners)
{
client.init(keyspace);
outputHandler.output("Established connection to initial hosts");
@@ -175,9 +175,8 @@ public class SSTableLoader implements StreamEventHandler
plan.transferFiles(remote, streamingDetails.get(remote));
}
- StreamResultFuture bulkResult = plan.execute();
- bulkResult.addEventListener(this);
- return bulkResult;
+ plan.listeners(this, listeners);
+ return plan.execute();
}
public void onSuccess(StreamState finalState) {}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 288929c..740ad66 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -33,6 +33,7 @@ public class StreamPlan
{
private final UUID planId = UUIDGen.getTimeUUID();
private final String description;
+ private final List<StreamEventHandler> handlers = new ArrayList<>();
// sessions per InetAddress of the other end.
private final Map<InetAddress, StreamSession> sessions = new HashMap<>();
@@ -121,6 +122,14 @@ public class StreamPlan
return this;
}
+ public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers)
+ {
+ this.handlers.add(handler);
+ if (handlers != null)
+ Collections.addAll(this.handlers, handlers);
+ return this;
+ }
+
/**
* @return true if this plan has no plan to execute
*/
@@ -136,7 +145,7 @@ public class StreamPlan
*/
public StreamResultFuture execute()
{
- return StreamResultFuture.init(planId, description, sessions.values());
+ return StreamResultFuture.init(planId, description, sessions.values(), handlers);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index ccd3c92..dcffaff 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -75,9 +75,14 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
set(getCurrentState());
}
- static StreamResultFuture init(UUID planId, String description, Collection<StreamSession> sessions)
+ static StreamResultFuture init(UUID planId, String description, Collection<StreamSession> sessions, Collection<StreamEventHandler> listeners)
{
StreamResultFuture future = createAndRegister(planId, description, sessions);
+ if (listeners != null)
+ {
+ for (StreamEventHandler listener : listeners)
+ future.addEventListener(listener);
+ }
logger.info("[Stream #{}] Executing streaming plan for {}", planId, description);
// start sessions
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 6c157e2..37ec635 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -79,7 +79,10 @@ public class BulkLoader
StreamResultFuture future = null;
try
{
- future = loader.stream(options.ignores);
+ if (options.noProgress)
+ future = loader.stream(options.ignores);
+ else
+ future = loader.stream(options.ignores, new ProgressIndicator());
}
catch (Exception e)
{
@@ -94,8 +97,6 @@ public class BulkLoader
}
handler.output(String.format("Streaming session ID: %s", future.planId));
- if (!options.noProgress)
- future.addEventListener(new ProgressIndicator());
try
{