You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/05/02 00:53:17 UTC
[1/3] git commit: Parallel streaming for sstableloader
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 31cd61387 -> 44fa2cdb4
refs/heads/trunk f0def3412 -> 0ed57bf53
Parallel streaming for sstableloader
patch by Joshua Mckenzie; reviewed by yukim for CASSANDRA-3668
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44fa2cdb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44fa2cdb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44fa2cdb
Branch: refs/heads/cassandra-2.1
Commit: 44fa2cdb48cf6b4e589d5c48f5c57f0a93c03b60
Parents: 31cd613
Author: Joshua McKenzie <jo...@datastax.com>
Authored: Thu May 1 17:51:12 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu May 1 17:51:12 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableLoader.java | 9 +-
.../net/IncomingStreamingConnection.java | 2 +-
.../cassandra/repair/StreamingRepairTask.java | 2 +-
.../cassandra/streaming/ConnectionHandler.java | 7 +-
.../cassandra/streaming/ProgressInfo.java | 10 +-
.../apache/cassandra/streaming/SessionInfo.java | 3 +
.../cassandra/streaming/StreamCoordinator.java | 272 +++++++++++++++++++
.../apache/cassandra/streaming/StreamEvent.java | 2 +
.../apache/cassandra/streaming/StreamPlan.java | 34 +--
.../cassandra/streaming/StreamResultFuture.java | 80 +++---
.../cassandra/streaming/StreamSession.java | 42 ++-
.../management/ProgressInfoCompositeData.java | 21 +-
.../management/SessionInfoCompositeData.java | 1 +
.../streaming/messages/StreamInitMessage.java | 9 +-
.../streaming/messages/StreamMessage.java | 2 +-
.../org/apache/cassandra/tools/BulkLoader.java | 169 ++++++++----
.../cassandra/streaming/SessionInfoTest.java | 6 +-
.../streaming/StreamTransferTaskTest.java | 2 +-
19 files changed, 500 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16fcdfc..86940ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.0-rc1
+ * Parallel streaming for sstableloader (CASSANDRA-3668)
Merged from 2.0:
* Add Google Compute Engine snitch (CASSANDRA-7132)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index b14e203..bbb1277 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -48,6 +48,7 @@ public class SSTableLoader implements StreamEventHandler
private final File directory;
private final String keyspace;
private final Client client;
+ private final int connectionsPerHost;
private final OutputHandler outputHandler;
private final Set<InetAddress> failedHosts = new HashSet<>();
@@ -61,10 +62,16 @@ public class SSTableLoader implements StreamEventHandler
public SSTableLoader(File directory, Client client, OutputHandler outputHandler)
{
+ this(directory, client, outputHandler, 1);
+ }
+
+ public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost)
+ {
this.directory = directory;
this.keyspace = directory.getParentFile().getName();
this.client = client;
this.outputHandler = outputHandler;
+ this.connectionsPerHost = connectionsPerHost;
}
protected Collection<SSTableReader> openSSTables(final Map<InetAddress, Collection<Range<Token>>> ranges)
@@ -150,7 +157,7 @@ public class SSTableLoader implements StreamEventHandler
client.init(keyspace);
outputHandler.output("Established connection to initial hosts");
- StreamPlan plan = new StreamPlan("Bulk Load");
+ StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost);
Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
openSSTables(endpointToRanges);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 20392f2..003bbf9 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -62,7 +62,7 @@ public class IncomingStreamingConnection extends Thread
// The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
// Note: we cannot use the same socket for incoming and outgoing streams because we want to
// parallelize said streams and the socket is blocking, so we might deadlock.
- StreamResultFuture.initReceivingSide(init.planId, init.description, init.from, socket, init.isForOutgoing, version);
+ StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 636568c..b9184ca 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -71,7 +71,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, request.ranges.size(), request.dst));
- StreamResultFuture op = new StreamPlan("Repair", repairedAt)
+ StreamResultFuture op = new StreamPlan("Repair", repairedAt, 1)
.flushBeforeTransfer(true)
// request ranges from the remote node
.requestRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 562645d..5484c83 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -217,7 +217,12 @@ public class ConnectionHandler
public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
{
- StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(), session.planId(), session.description(), isForOutgoing);
+ StreamInitMessage message = new StreamInitMessage(
+ FBUtilities.getBroadcastAddress(),
+ session.sessionIndex(),
+ session.planId(),
+ session.description(),
+ isForOutgoing);
ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
getWriteChannel(socket).write(messageBuf);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/ProgressInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
index d308ed0..fdd3e97 100644
--- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java
+++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
@@ -49,16 +49,18 @@ public class ProgressInfo implements Serializable
}
public final InetAddress peer;
+ public final int sessionIndex;
public final String fileName;
public final Direction direction;
public final long currentBytes;
public final long totalBytes;
- public ProgressInfo(InetAddress peer, String fileName, Direction direction, long currentBytes, long totalBytes)
+ public ProgressInfo(InetAddress peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes)
{
assert totalBytes > 0;
this.peer = peer;
+ this.sessionIndex = sessionIndex;
this.fileName = fileName;
this.direction = direction;
this.currentBytes = currentBytes;
@@ -70,7 +72,7 @@ public class ProgressInfo implements Serializable
*/
public boolean isCompleted()
{
- return currentBytes == totalBytes;
+ return currentBytes >= totalBytes;
}
/**
@@ -87,13 +89,14 @@ public class ProgressInfo implements Serializable
if (totalBytes != that.totalBytes) return false;
if (direction != that.direction) return false;
if (!fileName.equals(that.fileName)) return false;
+ if (sessionIndex != that.sessionIndex) return false;
return peer.equals(that.peer);
}
@Override
public int hashCode()
{
- return Objects.hashCode(peer, fileName, direction, totalBytes);
+ return Objects.hashCode(peer, sessionIndex, fileName, direction, totalBytes);
}
@Override
@@ -104,6 +107,7 @@ public class ProgressInfo implements Serializable
sb.append("/").append(totalBytes).append(" bytes");
sb.append("(").append(currentBytes*100/totalBytes).append("%) ");
sb.append(direction == Direction.OUT ? "sent to " : "received from ");
+ sb.append("idx:").append(sessionIndex);
sb.append(peer);
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java
index b722ecf..98e945b 100644
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Iterables;
public final class SessionInfo implements Serializable
{
public final InetAddress peer;
+ public final int sessionIndex;
/** Immutable collection of receiving summaries */
public final Collection<StreamSummary> receivingSummaries;
/** Immutable collection of sending summaries*/
@@ -44,11 +45,13 @@ public final class SessionInfo implements Serializable
private final Map<String, ProgressInfo> sendingFiles;
public SessionInfo(InetAddress peer,
+ int sessionIndex,
Collection<StreamSummary> receivingSummaries,
Collection<StreamSummary> sendingSummaries,
StreamSession.State state)
{
this.peer = peer;
+ this.sessionIndex = sessionIndex;
this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
this.receivingFiles = new ConcurrentHashMap<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
new file mode 100644
index 0000000..425b5b1
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * {@link StreamCoordinator} is a helper class that abstracts away maintaining multiple
+ * StreamSession and ProgressInfo instances per peer.
+ *
+ * This class coordinates multiple SessionStreams per peer in both the outgoing StreamPlan context and on the
+ * inbound StreamResultFuture context.
+ */
+public class StreamCoordinator
+{
+ private static final Logger logger = LoggerFactory.getLogger(StreamCoordinator.class);
+
+ // Executor strictly for establishing the initial connections. Once we're connected to the other end the rest of the
+ // streaming is handled directly by the ConnectionHandler's incoming and outgoing threads.
+ private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
+ FBUtilities.getAvailableProcessors());
+
+ private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>();
+ private final int connectionsPerHost;
+
+ public StreamCoordinator(int connectionsPerHost)
+ {
+ this.connectionsPerHost = connectionsPerHost;
+ }
+
+ /**
+ * @return true if any stream session is active
+ */
+ public synchronized boolean hasActiveSessions()
+ {
+ for (HostStreamingData data : peerSessions.values())
+ {
+ if (data.hasActiveSessions())
+ return true;
+ }
+ return false;
+ }
+
+ public synchronized Collection<StreamSession> getAllStreamSessions()
+ {
+ Collection<StreamSession> results = new ArrayList<>();
+ for (HostStreamingData data : peerSessions.values())
+ {
+ results.addAll(data.getAllStreamSessions());
+ }
+ return results;
+ }
+
+ public void connectAllStreamSessions()
+ {
+ for (HostStreamingData data : peerSessions.values())
+ data.connectAllStreamSessions();
+ }
+
+ public synchronized Set<InetAddress> getPeers()
+ {
+ return new HashSet<>(peerSessions.keySet());
+ }
+
+ public synchronized StreamSession getOrCreateNextSession(InetAddress peer)
+ {
+ return getOrCreateHostData(peer).getOrCreateNextSession(peer);
+ }
+
+ public synchronized StreamSession getOrCreateSessionById(InetAddress peer, int id)
+ {
+ return getOrCreateHostData(peer).getOrCreateSessionById(peer, id);
+ }
+
+ public synchronized void updateProgress(ProgressInfo info)
+ {
+ getHostData(info.peer).updateProgress(info);
+ }
+
+ public synchronized void addSessionInfo(SessionInfo session)
+ {
+ HostStreamingData data = getOrCreateHostData(session.peer);
+ data.addSessionInfo(session);
+ }
+
+ public synchronized Set<SessionInfo> getAllSessionInfo()
+ {
+ Set<SessionInfo> result = new HashSet<>();
+ for (HostStreamingData data : peerSessions.values())
+ {
+ result.addAll(data.getAllSessionInfo());
+ }
+ return result;
+ }
+
+ public synchronized void transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+ {
+ HostStreamingData sessionList = getOrCreateHostData(to);
+
+ if (connectionsPerHost > 1)
+ {
+ List<List<StreamSession.SSTableStreamingSections>> buckets = sliceSSTableDetails(sstableDetails);
+
+ for (List<StreamSession.SSTableStreamingSections> subList : buckets)
+ {
+ StreamSession session = sessionList.getOrCreateNextSession(to);
+ session.addTransferFiles(subList);
+ }
+ }
+ else
+ {
+ StreamSession session = sessionList.getOrCreateNextSession(to);
+ session.addTransferFiles(sstableDetails);
+ }
+ }
+
+ private List<List<StreamSession.SSTableStreamingSections>> sliceSSTableDetails(Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+ {
+ // There's no point in divvying things up into more buckets than we have sstableDetails
+ int targetSlices = Math.min(sstableDetails.size(), connectionsPerHost);
+ int step = Math.round((float) sstableDetails.size() / (float) targetSlices);
+ int index = 0;
+
+ List<List<StreamSession.SSTableStreamingSections>> result = new ArrayList<>();
+ List<StreamSession.SSTableStreamingSections> slice = null;
+ for (StreamSession.SSTableStreamingSections streamSession : sstableDetails)
+ {
+ if (index % step == 0)
+ {
+ slice = new ArrayList<>();
+ result.add(slice);
+ }
+ slice.add(streamSession);
+ ++index;
+ }
+
+ return result;
+ }
+
+ private HostStreamingData getHostData(InetAddress peer)
+ {
+ HostStreamingData data = peerSessions.get(peer);
+ if (data == null)
+ throw new IllegalArgumentException("Unknown peer requested: " + peer.toString());
+ return data;
+ }
+
+ private HostStreamingData getOrCreateHostData(InetAddress peer)
+ {
+ HostStreamingData data = peerSessions.get(peer);
+ if (data == null)
+ {
+ data = new HostStreamingData();
+ peerSessions.put(peer, data);
+ }
+ return data;
+ }
+
+ private class StreamSessionConnector implements Runnable
+ {
+ private final StreamSession session;
+ public StreamSessionConnector(StreamSession session)
+ {
+ this.session = session;
+ }
+
+ public void run()
+ {
+ session.start();
+ logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", session.planId(), session.sessionIndex(), session.peer);
+ }
+ }
+
+ private class HostStreamingData
+ {
+ private Map<Integer, StreamSession> streamSessions = new HashMap<>();
+ private Map<Integer, SessionInfo> sessionInfos = new HashMap<>();
+
+ private int lastReturned = -1;
+
+ public boolean hasActiveSessions()
+ {
+ for (StreamSession session : streamSessions.values())
+ {
+ StreamSession.State state = session.state();
+ if (state != StreamSession.State.COMPLETE && state != StreamSession.State.FAILED)
+ return true;
+ }
+ return false;
+ }
+
+ public StreamSession getOrCreateNextSession(InetAddress peer)
+ {
+ // create
+ if (streamSessions.size() < connectionsPerHost)
+ {
+ StreamSession session = new StreamSession(peer, streamSessions.size());
+ streamSessions.put(++lastReturned, session);
+ return session;
+ }
+ // get
+ else
+ {
+ if (lastReturned == streamSessions.size() - 1)
+ lastReturned = 0;
+
+ return streamSessions.get(lastReturned++);
+ }
+ }
+
+ public void connectAllStreamSessions()
+ {
+ for (StreamSession session : streamSessions.values())
+ {
+ streamExecutor.execute(new StreamSessionConnector(session));
+ }
+ }
+
+ public Collection<StreamSession> getAllStreamSessions()
+ {
+ return Collections.unmodifiableCollection(streamSessions.values());
+ }
+
+ public StreamSession getOrCreateSessionById(InetAddress peer, int id)
+ {
+ StreamSession session = streamSessions.get(id);
+ if (session == null)
+ {
+ session = new StreamSession(peer, id);
+ streamSessions.put(id, session);
+ }
+ return session;
+ }
+
+ public void updateProgress(ProgressInfo info)
+ {
+ sessionInfos.get(info.sessionIndex).updateProgress(info);
+ }
+
+ public void addSessionInfo(SessionInfo info)
+ {
+ sessionInfos.put(info.sessionIndex, info);
+ }
+
+ public Collection<SessionInfo> getAllSessionInfo()
+ {
+ return sessionInfos.values();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java b/src/java/org/apache/cassandra/streaming/StreamEvent.java
index 9af1fbd..8089323 100644
--- a/src/java/org/apache/cassandra/streaming/StreamEvent.java
+++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java
@@ -42,12 +42,14 @@ public abstract class StreamEvent
{
public final InetAddress peer;
public final boolean success;
+ public final int sessionIndex;
public SessionCompleteEvent(StreamSession session)
{
super(Type.STREAM_COMPLETE, session.planId());
this.peer = session.peer;
this.success = session.isSuccess();
+ this.sessionIndex = session.sessionIndex();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 04bd7df..e775c90 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -35,10 +35,8 @@ public class StreamPlan
private final UUID planId = UUIDGen.getTimeUUID();
private final String description;
private final List<StreamEventHandler> handlers = new ArrayList<>();
-
- // sessions per InetAddress of the other end.
- private final Map<InetAddress, StreamSession> sessions = new HashMap<>();
private final long repairedAt;
+ private final StreamCoordinator coordinator;
private boolean flushBeforeTransfer = true;
@@ -49,16 +47,16 @@ public class StreamPlan
*/
public StreamPlan(String description)
{
- this(description, ActiveRepairService.UNREPAIRED_SSTABLE);
+ this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1);
}
- public StreamPlan(String description, long repairedAt)
+ public StreamPlan(String description, long repairedAt, int connectionsPerHost)
{
this.description = description;
this.repairedAt = repairedAt;
+ this.coordinator = new StreamCoordinator(connectionsPerHost);
}
-
/**
* Request data in {@code keyspace} and {@code ranges} from specific node.
*
@@ -83,7 +81,7 @@ public class StreamPlan
*/
public StreamPlan requestRanges(InetAddress from, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
{
- StreamSession session = getOrCreateSession(from);
+ StreamSession session = coordinator.getOrCreateNextSession(from);
session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies), repairedAt);
return this;
}
@@ -112,7 +110,7 @@ public class StreamPlan
*/
public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
{
- StreamSession session = getOrCreateSession(to);
+ StreamSession session = coordinator.getOrCreateNextSession(to);
session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer, repairedAt);
return this;
}
@@ -127,9 +125,9 @@ public class StreamPlan
*/
public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
{
- StreamSession session = getOrCreateSession(to);
- session.addTransferFiles(sstableDetails);
+ coordinator.transferFiles(to, sstableDetails);
return this;
+
}
public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers)
@@ -145,7 +143,7 @@ public class StreamPlan
*/
public boolean isEmpty()
{
- return sessions.isEmpty();
+ return coordinator.hasActiveSessions();
}
/**
@@ -155,7 +153,7 @@ public class StreamPlan
*/
public StreamResultFuture execute()
{
- return StreamResultFuture.init(planId, description, sessions.values(), handlers);
+ return StreamResultFuture.init(planId, description, handlers, coordinator);
}
/**
@@ -170,16 +168,4 @@ public class StreamPlan
this.flushBeforeTransfer = flushBeforeTransfer;
return this;
}
-
- private StreamSession getOrCreateSession(InetAddress peer)
- {
- StreamSession session = sessions.get(peer);
- if (session == null)
- {
- session = new StreamSession(peer);
- sessions.put(peer, session);
- }
- return session;
- }
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index dcffaff..c04c3f1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -23,10 +23,8 @@ import java.net.Socket;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
-import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,11 +47,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
public final UUID planId;
public final String description;
+ private final StreamCoordinator coordinator;
private final Collection<StreamEventHandler> eventListeners = new ConcurrentLinkedQueue<>();
- private final Map<InetAddress, StreamSession> ongoingSessions;
- private final Map<InetAddress, SessionInfo> sessionStates = new NonBlockingHashMap<>();
-
/**
* Create new StreamResult of given {@code planId} and type.
*
@@ -62,22 +58,25 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
* @param planId Stream plan ID
* @param description Stream description
*/
- private StreamResultFuture(UUID planId, String description, Collection<StreamSession> sessions)
+ private StreamResultFuture(UUID planId, String description, StreamCoordinator coordinator)
{
this.planId = planId;
this.description = description;
- this.ongoingSessions = new HashMap<>(sessions.size());
- for (StreamSession session : sessions)
- this.ongoingSessions.put(session.peer, session);
+ this.coordinator = coordinator;
// if there is no session to listen to, we immediately set result for returning
- if (sessions.isEmpty())
+ if (!coordinator.hasActiveSessions())
set(getCurrentState());
}
- static StreamResultFuture init(UUID planId, String description, Collection<StreamSession> sessions, Collection<StreamEventHandler> listeners)
+ private StreamResultFuture(UUID planId, String description)
+ {
+ this(planId, description, new StreamCoordinator(0));
+ }
+
+ static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners, StreamCoordinator coordinator)
{
- StreamResultFuture future = createAndRegister(planId, description, sessions);
+ StreamResultFuture future = createAndRegister(planId, description, coordinator);
if (listeners != null)
{
for (StreamEventHandler listener : listeners)
@@ -85,18 +84,19 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
}
logger.info("[Stream #{}] Executing streaming plan for {}", planId, description);
- // start sessions
- for (final StreamSession session : sessions)
+
+ // Initialize and start all sessions
+ for (final StreamSession session : coordinator.getAllStreamSessions())
{
- logger.info("[Stream #{}] Beginning stream session with {}", planId, session.peer);
session.init(future);
- session.start();
}
+ coordinator.connectAllStreamSessions();
return future;
}
- public static synchronized StreamResultFuture initReceivingSide(UUID planId,
+ public static synchronized StreamResultFuture initReceivingSide(int sessionIndex,
+ UUID planId,
String description,
InetAddress from,
Socket socket,
@@ -106,35 +106,28 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
if (future == null)
{
- final StreamSession session = new StreamSession(from);
+ logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description);
// The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
- future = new StreamResultFuture(planId, description, Collections.singleton(session));
+ future = new StreamResultFuture(planId, description);
StreamManager.instance.registerReceiving(future);
-
- session.init(future);
- session.handler.initiateOnReceivingSide(socket, isForOutgoing, version);
- }
- else
- {
- future.attachSocket(from, socket, isForOutgoing, version);
- logger.info("[Stream #{}] Received streaming plan for {}", planId, description);
}
+ future.attachSocket(from, sessionIndex, socket, isForOutgoing, version);
+ logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, description);
return future;
}
- private static StreamResultFuture createAndRegister(UUID planId, String description, Collection<StreamSession> sessions)
+ private static StreamResultFuture createAndRegister(UUID planId, String description, StreamCoordinator coordinator)
{
- StreamResultFuture future = new StreamResultFuture(planId, description, sessions);
+ StreamResultFuture future = new StreamResultFuture(planId, description, coordinator);
StreamManager.instance.register(future);
return future;
}
- public void attachSocket(InetAddress from, Socket socket, boolean isForOutgoing, int version) throws IOException
+ private void attachSocket(InetAddress from, int sessionIndex, Socket socket, boolean isForOutgoing, int version) throws IOException
{
- StreamSession session = ongoingSessions.get(from);
- if (session == null)
- throw new RuntimeException(String.format("Got connection from %s for stream session %s but no such session locally", from, planId));
+ StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex);
+ session.init(this);
session.handler.initiateOnReceivingSide(socket, isForOutgoing, version);
}
@@ -149,7 +142,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
*/
public StreamState getCurrentState()
{
- return new StreamState(planId, description, ImmutableSet.copyOf(sessionStates.values()));
+ return new StreamState(planId, description, coordinator.getAllSessionInfo());
}
@Override
@@ -170,44 +163,41 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
void handleSessionPrepared(StreamSession session)
{
SessionInfo sessionInfo = session.getSessionInfo();
- logger.info("[Stream #{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)",
+ logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)",
session.planId(),
+ session.sessionIndex(),
sessionInfo.getTotalFilesToReceive(),
sessionInfo.getTotalSizeToReceive(),
sessionInfo.getTotalFilesToSend(),
sessionInfo.getTotalSizeToSend());
StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(planId, sessionInfo);
- sessionStates.put(sessionInfo.peer, sessionInfo);
+ coordinator.addSessionInfo(sessionInfo);
fireStreamEvent(event);
}
void handleSessionComplete(StreamSession session)
{
logger.info("[Stream #{}] Session with {} is complete", session.planId(), session.peer);
-
- SessionInfo sessionInfo = session.getSessionInfo();
- sessionStates.put(sessionInfo.peer, sessionInfo);
fireStreamEvent(new StreamEvent.SessionCompleteEvent(session));
- maybeComplete(session);
+ maybeComplete();
}
public void handleProgress(ProgressInfo progress)
{
- sessionStates.get(progress.peer).updateProgress(progress);
+ coordinator.updateProgress(progress);
fireStreamEvent(new StreamEvent.ProgressEvent(planId, progress));
}
- void fireStreamEvent(StreamEvent event)
+ synchronized void fireStreamEvent(StreamEvent event)
{
// delegate to listener
for (StreamEventHandler listener : eventListeners)
listener.handleStreamEvent(event);
}
- private synchronized void maybeComplete(StreamSession session)
+ private synchronized void maybeComplete()
{
- ongoingSessions.remove(session.peer);
- if (ongoingSessions.isEmpty())
+ if (!coordinator.hasActiveSessions())
{
StreamState finalState = getCurrentState();
if (finalState.hasFailedSession())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index c5f4cf9..a1c571b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
@@ -113,12 +112,8 @@ import org.apache.cassandra.utils.Pair;
public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
{
private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
-
- // Executor that establish the streaming connection. Once we're connected to the other end, the rest of the streaming
- // is directly handled by the ConnectionHandler incoming and outgoing threads.
- private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
- FBUtilities.getAvailableProcessors());
public final InetAddress peer;
+ private final int index;
// should not be null when session is started
private StreamResultFuture streamResult;
@@ -153,9 +148,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
*
* @param peer Address of streaming peer
*/
- public StreamSession(InetAddress peer)
+ public StreamSession(InetAddress peer, int index)
{
this.peer = peer;
+ this.index = index;
this.handler = new ConnectionHandler(this);
this.metrics = StreamingMetrics.get(peer);
}
@@ -165,6 +161,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
return streamResult == null ? null : streamResult.planId;
}
+ public int sessionIndex()
+ {
+ return index;
+ }
+
public String description()
{
return streamResult == null ? null : streamResult.description;
@@ -194,21 +195,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
return;
}
- streamExecutor.execute(new Runnable()
+ try
{
- public void run()
- {
- try
- {
- handler.initiate();
- onInitializationComplete();
- }
- catch (IOException e)
- {
- onError(e);
- }
- }
- });
+ logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", planId(), sessionIndex(), peer);
+ handler.initiate();
+ onInitializationComplete();
+ }
+ catch (Exception e)
+ {
+ onError(e);
+ }
}
/**
@@ -519,7 +515,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
public void progress(Descriptor desc, ProgressInfo.Direction direction, long bytes, long total)
{
- ProgressInfo progress = new ProgressInfo(peer, desc.filenameFor(Component.DATA), direction, bytes, total);
+ ProgressInfo progress = new ProgressInfo(peer, index, desc.filenameFor(Component.DATA), direction, bytes, total);
streamResult.handleProgress(progress);
}
@@ -590,7 +586,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
List<StreamSummary> transferSummaries = Lists.newArrayList();
for (StreamTask transfer : transfers.values())
transferSummaries.add(transfer.getSummary());
- return new SessionInfo(peer, receivingSummaries, transferSummaries, state);
+ return new SessionInfo(peer, index, receivingSummaries, transferSummaries, state);
}
public synchronized void taskCompleted(StreamReceiveTask completedTask)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
index b361b1b..a54498d 100644
--- a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
@@ -32,18 +32,21 @@ public class ProgressInfoCompositeData
{
private static final String[] ITEM_NAMES = new String[]{"planId",
"peer",
+ "sessionIndex",
"fileName",
"direction",
"currentBytes",
"totalBytes"};
private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID",
"Session peer",
+ "Index of session",
"Name of the file",
"Direction('IN' or 'OUT')",
"Current bytes transferred",
"Total bytes to transfer"};
private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
SimpleType.STRING,
+ SimpleType.INTEGER,
SimpleType.STRING,
SimpleType.STRING,
SimpleType.LONG,
@@ -70,10 +73,11 @@ public class ProgressInfoCompositeData
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(ITEM_NAMES[0], planId.toString());
valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress());
- valueMap.put(ITEM_NAMES[2], progressInfo.fileName);
- valueMap.put(ITEM_NAMES[3], progressInfo.direction.name());
- valueMap.put(ITEM_NAMES[4], progressInfo.currentBytes);
- valueMap.put(ITEM_NAMES[5], progressInfo.totalBytes);
+ valueMap.put(ITEM_NAMES[2], progressInfo.sessionIndex);
+ valueMap.put(ITEM_NAMES[3], progressInfo.fileName);
+ valueMap.put(ITEM_NAMES[4], progressInfo.direction.name());
+ valueMap.put(ITEM_NAMES[5], progressInfo.currentBytes);
+ valueMap.put(ITEM_NAMES[6], progressInfo.totalBytes);
try
{
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
@@ -90,10 +94,11 @@ public class ProgressInfoCompositeData
try
{
return new ProgressInfo(InetAddress.getByName((String) values[1]),
- (String) values[2],
- ProgressInfo.Direction.valueOf((String)values[3]),
- (long) values[4],
- (long) values[5]);
+ (int) values[2],
+ (String) values[3],
+ ProgressInfo.Direction.valueOf((String)values[4]),
+ (long) values[5],
+ (long) values[6]);
}
catch (UnknownHostException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
index 658facf..8618cca 100644
--- a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
@@ -129,6 +129,7 @@ public class SessionInfoCompositeData
}
};
SessionInfo info = new SessionInfo(peer,
+ (int)values[2],
fromArrayOfCompositeData((CompositeData[]) values[2], toStreamSummary),
fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
StreamSession.State.valueOf((String) values[4]));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 947e5d5..a9ec4ae 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -40,15 +40,17 @@ public class StreamInitMessage
public static IVersionedSerializer<StreamInitMessage> serializer = new StreamInitMessageSerializer();
public final InetAddress from;
+ public final int sessionIndex;
public final UUID planId;
public final String description;
// true if this init message is to connect for outgoing message on receiving side
public final boolean isForOutgoing;
- public StreamInitMessage(InetAddress from, UUID planId, String description, boolean isForOutgoing)
+ public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing)
{
this.from = from;
+ this.sessionIndex = sessionIndex;
this.planId = planId;
this.description = description;
this.isForOutgoing = isForOutgoing;
@@ -99,6 +101,7 @@ public class StreamInitMessage
public void serialize(StreamInitMessage message, DataOutputPlus out, int version) throws IOException
{
CompactEndpointSerializationHelper.serialize(message.from, out);
+ out.writeInt(message.sessionIndex);
UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version);
out.writeUTF(message.description);
out.writeBoolean(message.isForOutgoing);
@@ -107,15 +110,17 @@ public class StreamInitMessage
public StreamInitMessage deserialize(DataInput in, int version) throws IOException
{
InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
+ int sessionIndex = in.readInt();
UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
String description = in.readUTF();
boolean sentByInitiator = in.readBoolean();
- return new StreamInitMessage(from, planId, description, sentByInitiator);
+ return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator);
}
public long serializedSize(StreamInitMessage message, int version)
{
long size = CompactEndpointSerializationHelper.serializedSize(message.from);
+ size += TypeSizes.NATIVE.sizeof(message.sessionIndex);
size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version);
size += TypeSizes.NATIVE.sizeof(message.description);
size += TypeSizes.NATIVE.sizeof(message.isForOutgoing);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 50c3911..360b59e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.streaming.StreamSession;
public abstract class StreamMessage
{
/** Streaming protocol version */
- public static final int CURRENT_VERSION = 1;
+ public static final int CURRENT_VERSION = 2;
public static void serialize(StreamMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 01952e5..6042f92 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -21,12 +21,10 @@ import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import com.google.common.base.Joiner;
-import com.google.common.collect.Sets;
-import org.apache.cassandra.config.EncryptionOptions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import org.apache.commons.cli.*;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
@@ -35,6 +33,7 @@ import org.apache.thrift.transport.TTransport;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.UTF8Type;
@@ -50,7 +49,6 @@ public class BulkLoader
{
private static final String TOOL_NAME = "sstableloader";
private static final String VERBOSE_OPTION = "verbose";
- private static final String DEBUG_OPTION = "debug";
private static final String HELP_OPTION = "help";
private static final String NOPROGRESS_OPTION = "no-progress";
private static final String IGNORE_NODES_OPTION = "ignore";
@@ -68,47 +66,63 @@ public class BulkLoader
private static final String SSL_ALGORITHM = "ssl-alg";
private static final String SSL_STORE_TYPE = "store-type";
private static final String SSL_CIPHER_SUITES = "ssl-ciphers";
+ private static final String CONNECTIONS_PER_HOST = "connections-per-host";
public static void main(String args[])
{
LoaderOptions options = LoaderOptions.parseArgs(args);
- OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
- SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts, options.rpcPort, options.user, options.passwd, options.transportFactory), handler);
+ OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, true);
+ SSTableLoader loader = new SSTableLoader(
+ options.directory,
+ new ExternalClient(
+ options.hosts,
+ options.rpcPort,
+ options.user,
+ options.passwd,
+ options.transportFactory),
+ handler,
+ options.connectionsPerHost);
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
StreamResultFuture future = null;
+
+ ProgressIndicator indicator = new ProgressIndicator();
try
{
if (options.noProgress)
+ {
future = loader.stream(options.ignores);
+ }
else
- future = loader.stream(options.ignores, new ProgressIndicator());
+ {
+ future = loader.stream(options.ignores, indicator);
+ }
+
}
catch (Exception e)
{
System.err.println(e.getMessage());
if (e.getCause() != null)
System.err.println(e.getCause());
- if (options.debug)
- e.printStackTrace(System.err);
- else
- System.err.println("Run with --debug to get full stack trace or --help to get help.");
+ e.printStackTrace(System.err);
System.exit(1);
}
- handler.output(String.format("Streaming session ID: %s", future.planId));
-
try
{
future.get();
+
+ if (!options.noProgress)
+ indicator.printSummary(options.connectionsPerHost);
+
+ // Give sockets time to gracefully close
+ Thread.sleep(1000);
System.exit(0); // We need that to stop non daemonized threads
}
catch (Exception e)
{
System.err.println("Streaming to the following hosts failed:");
System.err.println(loader.getFailedHosts());
- System.err.println(e);
- if (options.debug)
- e.printStackTrace(System.err);
+ e.printStackTrace(System.err);
System.exit(1);
}
}
@@ -116,13 +130,15 @@ public class BulkLoader
// Return true when everything is at 100%
static class ProgressIndicator implements StreamEventHandler
{
- private final Map<InetAddress, SessionInfo> sessionsByHost = new ConcurrentHashMap<>();
- private final Map<InetAddress, Set<ProgressInfo>> progressByHost = new ConcurrentHashMap<>();
-
private long start;
private long lastProgress;
private long lastTime;
+ private int peak = 0;
+ private int totalFiles = 0;
+
+ private final Multimap<InetAddress, SessionInfo> sessionsByHost = HashMultimap.create();
+
public ProgressIndicator()
{
start = lastTime = System.nanoTime();
@@ -131,70 +147,100 @@ public class BulkLoader
public void onSuccess(StreamState finalState) {}
public void onFailure(Throwable t) {}
- public void handleStreamEvent(StreamEvent event)
+ public synchronized void handleStreamEvent(StreamEvent event)
{
if (event.eventType == StreamEvent.Type.STREAM_PREPARED)
{
SessionInfo session = ((StreamEvent.SessionPreparedEvent) event).session;
sessionsByHost.put(session.peer, session);
}
- else if (event.eventType == StreamEvent.Type.FILE_PROGRESS)
+ else if (event.eventType == StreamEvent.Type.FILE_PROGRESS || event.eventType == StreamEvent.Type.STREAM_COMPLETE)
{
- ProgressInfo progressInfo = ((StreamEvent.ProgressEvent) event).progress;
-
- // update progress
- Set<ProgressInfo> progresses = progressByHost.get(progressInfo.peer);
- if (progresses == null)
+ ProgressInfo progressInfo = null;
+ if (event.eventType == StreamEvent.Type.FILE_PROGRESS)
{
- progresses = Sets.newSetFromMap(new ConcurrentHashMap<ProgressInfo, Boolean>());
- progressByHost.put(progressInfo.peer, progresses);
+ progressInfo = ((StreamEvent.ProgressEvent) event).progress;
}
- if (progresses.contains(progressInfo))
- progresses.remove(progressInfo);
- progresses.add(progressInfo);
+
+ long time = System.nanoTime();
+ long deltaTime = time - lastTime;
StringBuilder sb = new StringBuilder();
sb.append("\rprogress: ");
long totalProgress = 0;
long totalSize = 0;
- for (Map.Entry<InetAddress, Set<ProgressInfo>> entry : progressByHost.entrySet())
+
+ boolean updateTotalFiles = totalFiles == 0;
+ // recalculate progress across all sessions in all hosts and display
+ for (InetAddress peer : sessionsByHost.keySet())
{
- SessionInfo session = sessionsByHost.get(entry.getKey());
+ sb.append("[").append(peer.toString()).append("]");
- long size = session.getTotalSizeToSend();
- long current = 0;
- int completed = 0;
- for (ProgressInfo progress : entry.getValue())
+ for (SessionInfo session : sessionsByHost.get(peer))
{
- if (progress.currentBytes == progress.totalBytes)
- completed++;
- current += progress.currentBytes;
+ long size = session.getTotalSizeToSend();
+ long current = 0;
+ int completed = 0;
+
+ if (progressInfo != null && session.peer.equals(progressInfo.peer) && (session.sessionIndex == progressInfo.sessionIndex))
+ {
+ session.updateProgress(progressInfo);
+ }
+ for (ProgressInfo progress : session.getSendingFiles())
+ {
+ if (progress.isCompleted())
+ completed++;
+ current += progress.currentBytes;
+ }
+ totalProgress += current;
+
+ totalSize += size;
+
+ sb.append(session.sessionIndex).append(":");
+ sb.append(completed).append("/").append(session.getTotalFilesToSend());
+ sb.append(" ").append(String.format("%-3d", size == 0 ? 100L : current * 100L / size)).append("% ");
+
+ if (updateTotalFiles)
+ totalFiles += session.getTotalFilesToSend();
}
- totalProgress += current;
- totalSize += size;
- sb.append("[").append(entry.getKey());
- sb.append(" ").append(completed).append("/").append(session.getTotalFilesToSend());
- sb.append(" (").append(size == 0 ? 100L : current * 100L / size).append("%)] ");
}
- long time = System.nanoTime();
- long deltaTime = TimeUnit.NANOSECONDS.toMillis(time - lastTime);
+
lastTime = time;
long deltaProgress = totalProgress - lastProgress;
lastProgress = totalProgress;
- sb.append("[total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append("% - ");
- sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s");
- sb.append(" (avg: ").append(mbPerSec(totalProgress, TimeUnit.NANOSECONDS.toMillis(time - start))).append("MB/s)]");
+ sb.append("total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append("% ");
+ sb.append(String.format("%-3d", mbPerSec(deltaProgress, deltaTime))).append("MB/s");
+ int average = mbPerSec(totalProgress, (time - start));
+ if (average > peak)
+ peak = average;
+ sb.append("(avg: ").append(average).append(" MB/s)");
- System.out.print(sb.toString());
+ System.err.print(sb.toString());
}
}
- private int mbPerSec(long bytes, long timeInMs)
+ private int mbPerSec(long bytes, long timeInNano)
{
- double bytesPerMs = ((double)bytes) / timeInMs;
- return (int)((bytesPerMs * 1000) / (1024 * 2024));
+ double bytesPerNano = ((double)bytes) / timeInNano;
+ return (int)((bytesPerNano * 1000 * 1000 * 1000) / (1024 * 2024));
+ }
+
+ private void printSummary(int connectionsPerHost)
+ {
+ long end = System.nanoTime();
+ long durationMS = ((end - start) / (1000000));
+ int average = mbPerSec(lastProgress, (end - start));
+ StringBuilder sb = new StringBuilder();
+ sb.append("\nSummary statistics: \n");
+ sb.append(String.format(" %-30s: %-10d\n", "Connections per host: ", connectionsPerHost));
+ sb.append(String.format(" %-30s: %-10d\n", "Total files transferred: ", totalFiles));
+ sb.append(String.format(" %-30s: %-10d\n", "Total bytes transferred: ", lastProgress));
+ sb.append(String.format(" %-30s: %-10d\n", "Total duration (ms): ", durationMS));
+ sb.append(String.format(" %-30s: %-10d\n", "Average transfer rate (MB/s): ", + average));
+ sb.append(String.format(" %-30s: %-10d\n", "Peak transfer rate (MB/s): ", + peak));
+ System.err.println(sb.toString());
}
}
@@ -282,7 +328,7 @@ public class BulkLoader
Cassandra.Client client = new Cassandra.Client(protocol);
if (user != null && passwd != null)
{
- Map<String, String> credentials = new HashMap<String, String>();
+ Map<String, String> credentials = new HashMap<>();
credentials.put(IAuthenticator.USERNAME_KEY, user);
credentials.put(IAuthenticator.PASSWORD_KEY, passwd);
AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
@@ -305,9 +351,10 @@ public class BulkLoader
public int throttle = 0;
public ITransportFactory transportFactory = new TFramedTransportFactory();
public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions();
+ public int connectionsPerHost = 1;
- public final Set<InetAddress> hosts = new HashSet<InetAddress>();
- public final Set<InetAddress> ignores = new HashSet<InetAddress>();
+ public final Set<InetAddress> hosts = new HashSet<>();
+ public final Set<InetAddress> ignores = new HashSet<>();
LoaderOptions(File directory)
{
@@ -354,7 +401,6 @@ public class BulkLoader
LoaderOptions opts = new LoaderOptions(dir);
- opts.debug = cmd.hasOption(DEBUG_OPTION);
opts.verbose = cmd.hasOption(VERBOSE_OPTION);
opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION);
@@ -409,6 +455,9 @@ public class BulkLoader
}
}
+ if (cmd.hasOption(CONNECTIONS_PER_HOST))
+ opts.connectionsPerHost = Integer.parseInt(cmd.getOptionValue(CONNECTIONS_PER_HOST));
+
if(cmd.hasOption(SSL_TRUSTSTORE))
{
opts.encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE);
@@ -520,7 +569,6 @@ public class BulkLoader
private static CmdLineOptions getCmdLineOptions()
{
CmdLineOptions options = new CmdLineOptions();
- options.addOption(null, DEBUG_OPTION, "display stack traces");
options.addOption("v", VERBOSE_OPTION, "verbose output");
options.addOption("h", HELP_OPTION, "display this help message");
options.addOption(null, NOPROGRESS_OPTION, "don't display progress");
@@ -531,6 +579,7 @@ public class BulkLoader
options.addOption("u", USER_OPTION, "username", "username for cassandra authentication");
options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication");
options.addOption("tf", TRANSPORT_FACTORY, "transport factory", "Fully-qualified ITransportFactory class name for creating a connection to cassandra");
+ options.addOption("cph", CONNECTIONS_PER_HOST, "connectionsPerHost", "number of concurrent connections-per-host.");
// ssl connection-related options
options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "SSL: full path to truststore");
options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "SSL: password of the truststore");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
index 60fbf40..f015a01 100644
--- a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
@@ -46,7 +46,7 @@ public class SessionInfoTest
}
StreamSummary sending = new StreamSummary(cfId, 10, 100);
- SessionInfo info = new SessionInfo(local, summaries, Collections.singleton(sending), StreamSession.State.PREPARING);
+ SessionInfo info = new SessionInfo(local, 0, summaries, Collections.singleton(sending), StreamSession.State.PREPARING);
assert info.getTotalFilesToReceive() == 45;
assert info.getTotalFilesToSend() == 10;
@@ -57,13 +57,13 @@ public class SessionInfoTest
assert info.getTotalFilesSent() == 0;
// receive in progress
- info.updateProgress(new ProgressInfo(local, "test.txt", ProgressInfo.Direction.IN, 50, 100));
+ info.updateProgress(new ProgressInfo(local, 0, "test.txt", ProgressInfo.Direction.IN, 50, 100));
// still in progress, but not completed yet
assert info.getTotalSizeReceived() == 50;
assert info.getTotalSizeSent() == 0;
assert info.getTotalFilesReceived() == 0;
assert info.getTotalFilesSent() == 0;
- info.updateProgress(new ProgressInfo(local, "test.txt", ProgressInfo.Direction.IN, 100, 100));
+ info.updateProgress(new ProgressInfo(local, 0, "test.txt", ProgressInfo.Direction.IN, 100, 100));
// 1 file should be completed
assert info.getTotalSizeReceived() == 100;
assert info.getTotalSizeSent() == 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index e22fa8f..c6c04b0 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -43,7 +43,7 @@ public class StreamTransferTaskTest extends SchemaLoader
String ks = "Keyspace1";
String cf = "Standard1";
- StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress());
+ StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), 0);
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
// create two sstables
[2/3] git commit: Parallel streaming for sstableloader
Posted by yu...@apache.org.
Parallel streaming for sstableloader
patch by Joshua Mckenzie; reviewed by yukim for CASSANDRA-3668
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44fa2cdb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44fa2cdb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44fa2cdb
Branch: refs/heads/trunk
Commit: 44fa2cdb48cf6b4e589d5c48f5c57f0a93c03b60
Parents: 31cd613
Author: Joshua McKenzie <jo...@datastax.com>
Authored: Thu May 1 17:51:12 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu May 1 17:51:12 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableLoader.java | 9 +-
.../net/IncomingStreamingConnection.java | 2 +-
.../cassandra/repair/StreamingRepairTask.java | 2 +-
.../cassandra/streaming/ConnectionHandler.java | 7 +-
.../cassandra/streaming/ProgressInfo.java | 10 +-
.../apache/cassandra/streaming/SessionInfo.java | 3 +
.../cassandra/streaming/StreamCoordinator.java | 272 +++++++++++++++++++
.../apache/cassandra/streaming/StreamEvent.java | 2 +
.../apache/cassandra/streaming/StreamPlan.java | 34 +--
.../cassandra/streaming/StreamResultFuture.java | 80 +++---
.../cassandra/streaming/StreamSession.java | 42 ++-
.../management/ProgressInfoCompositeData.java | 21 +-
.../management/SessionInfoCompositeData.java | 1 +
.../streaming/messages/StreamInitMessage.java | 9 +-
.../streaming/messages/StreamMessage.java | 2 +-
.../org/apache/cassandra/tools/BulkLoader.java | 169 ++++++++----
.../cassandra/streaming/SessionInfoTest.java | 6 +-
.../streaming/StreamTransferTaskTest.java | 2 +-
19 files changed, 500 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16fcdfc..86940ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.0-rc1
+ * Parallel streaming for sstableloader (CASSANDRA-3668)
Merged from 2.0:
* Add Google Compute Engine snitch (CASSANDRA-7132)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index b14e203..bbb1277 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -48,6 +48,7 @@ public class SSTableLoader implements StreamEventHandler
private final File directory;
private final String keyspace;
private final Client client;
+ private final int connectionsPerHost;
private final OutputHandler outputHandler;
private final Set<InetAddress> failedHosts = new HashSet<>();
@@ -61,10 +62,16 @@ public class SSTableLoader implements StreamEventHandler
public SSTableLoader(File directory, Client client, OutputHandler outputHandler)
{
+ this(directory, client, outputHandler, 1);
+ }
+
+ public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost)
+ {
this.directory = directory;
this.keyspace = directory.getParentFile().getName();
this.client = client;
this.outputHandler = outputHandler;
+ this.connectionsPerHost = connectionsPerHost;
}
protected Collection<SSTableReader> openSSTables(final Map<InetAddress, Collection<Range<Token>>> ranges)
@@ -150,7 +157,7 @@ public class SSTableLoader implements StreamEventHandler
client.init(keyspace);
outputHandler.output("Established connection to initial hosts");
- StreamPlan plan = new StreamPlan("Bulk Load");
+ StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost);
Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
openSSTables(endpointToRanges);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 20392f2..003bbf9 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -62,7 +62,7 @@ public class IncomingStreamingConnection extends Thread
// The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
// Note: we cannot use the same socket for incoming and outgoing streams because we want to
// parallelize said streams and the socket is blocking, so we might deadlock.
- StreamResultFuture.initReceivingSide(init.planId, init.description, init.from, socket, init.isForOutgoing, version);
+ StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 636568c..b9184ca 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -71,7 +71,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, request.ranges.size(), request.dst));
- StreamResultFuture op = new StreamPlan("Repair", repairedAt)
+ StreamResultFuture op = new StreamPlan("Repair", repairedAt, 1)
.flushBeforeTransfer(true)
// request ranges from the remote node
.requestRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 562645d..5484c83 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -217,7 +217,12 @@ public class ConnectionHandler
public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
{
- StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(), session.planId(), session.description(), isForOutgoing);
+ StreamInitMessage message = new StreamInitMessage(
+ FBUtilities.getBroadcastAddress(),
+ session.sessionIndex(),
+ session.planId(),
+ session.description(),
+ isForOutgoing);
ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
getWriteChannel(socket).write(messageBuf);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/ProgressInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
index d308ed0..fdd3e97 100644
--- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java
+++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
@@ -49,16 +49,18 @@ public class ProgressInfo implements Serializable
}
public final InetAddress peer;
+ public final int sessionIndex;
public final String fileName;
public final Direction direction;
public final long currentBytes;
public final long totalBytes;
- public ProgressInfo(InetAddress peer, String fileName, Direction direction, long currentBytes, long totalBytes)
+ public ProgressInfo(InetAddress peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes)
{
assert totalBytes > 0;
this.peer = peer;
+ this.sessionIndex = sessionIndex;
this.fileName = fileName;
this.direction = direction;
this.currentBytes = currentBytes;
@@ -70,7 +72,7 @@ public class ProgressInfo implements Serializable
*/
public boolean isCompleted()
{
- return currentBytes == totalBytes;
+ return currentBytes >= totalBytes;
}
/**
@@ -87,13 +89,14 @@ public class ProgressInfo implements Serializable
if (totalBytes != that.totalBytes) return false;
if (direction != that.direction) return false;
if (!fileName.equals(that.fileName)) return false;
+ if (sessionIndex != that.sessionIndex) return false;
return peer.equals(that.peer);
}
@Override
public int hashCode()
{
- return Objects.hashCode(peer, fileName, direction, totalBytes);
+ return Objects.hashCode(peer, sessionIndex, fileName, direction, totalBytes);
}
@Override
@@ -104,6 +107,7 @@ public class ProgressInfo implements Serializable
sb.append("/").append(totalBytes).append(" bytes");
sb.append("(").append(currentBytes*100/totalBytes).append("%) ");
sb.append(direction == Direction.OUT ? "sent to " : "received from ");
+ sb.append("idx:").append(sessionIndex);
sb.append(peer);
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java
index b722ecf..98e945b 100644
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Iterables;
public final class SessionInfo implements Serializable
{
public final InetAddress peer;
+ public final int sessionIndex;
/** Immutable collection of receiving summaries */
public final Collection<StreamSummary> receivingSummaries;
/** Immutable collection of sending summaries*/
@@ -44,11 +45,13 @@ public final class SessionInfo implements Serializable
private final Map<String, ProgressInfo> sendingFiles;
public SessionInfo(InetAddress peer,
+ int sessionIndex,
Collection<StreamSummary> receivingSummaries,
Collection<StreamSummary> sendingSummaries,
StreamSession.State state)
{
this.peer = peer;
+ this.sessionIndex = sessionIndex;
this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
this.receivingFiles = new ConcurrentHashMap<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
new file mode 100644
index 0000000..425b5b1
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * {@link StreamCoordinator} is a helper class that abstracts away maintaining multiple
+ * StreamSession and ProgressInfo instances per peer.
+ *
+ * This class coordinates multiple SessionStreams per peer in both the outgoing StreamPlan context and on the
+ * inbound StreamResultFuture context.
+ */
+public class StreamCoordinator
+{
+ private static final Logger logger = LoggerFactory.getLogger(StreamCoordinator.class);
+
+ // Executor strictly for establishing the initial connections. Once we're connected to the other end the rest of the
+ // streaming is handled directly by the ConnectionHandler's incoming and outgoing threads.
+ private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
+ FBUtilities.getAvailableProcessors());
+
+ private Map<InetAddress, HostStreamingData> peerSessions = new HashMap<>();
+ private final int connectionsPerHost;
+
+ public StreamCoordinator(int connectionsPerHost)
+ {
+ this.connectionsPerHost = connectionsPerHost;
+ }
+
+ /**
+ * @return true if any stream session is active
+ */
+ public synchronized boolean hasActiveSessions()
+ {
+ for (HostStreamingData data : peerSessions.values())
+ {
+ if (data.hasActiveSessions())
+ return true;
+ }
+ return false;
+ }
+
+ public synchronized Collection<StreamSession> getAllStreamSessions()
+ {
+ Collection<StreamSession> results = new ArrayList<>();
+ for (HostStreamingData data : peerSessions.values())
+ {
+ results.addAll(data.getAllStreamSessions());
+ }
+ return results;
+ }
+
+ public void connectAllStreamSessions()
+ {
+ for (HostStreamingData data : peerSessions.values())
+ data.connectAllStreamSessions();
+ }
+
+ public synchronized Set<InetAddress> getPeers()
+ {
+ return new HashSet<>(peerSessions.keySet());
+ }
+
+ public synchronized StreamSession getOrCreateNextSession(InetAddress peer)
+ {
+ return getOrCreateHostData(peer).getOrCreateNextSession(peer);
+ }
+
+ public synchronized StreamSession getOrCreateSessionById(InetAddress peer, int id)
+ {
+ return getOrCreateHostData(peer).getOrCreateSessionById(peer, id);
+ }
+
+ public synchronized void updateProgress(ProgressInfo info)
+ {
+ getHostData(info.peer).updateProgress(info);
+ }
+
+ public synchronized void addSessionInfo(SessionInfo session)
+ {
+ HostStreamingData data = getOrCreateHostData(session.peer);
+ data.addSessionInfo(session);
+ }
+
+ public synchronized Set<SessionInfo> getAllSessionInfo()
+ {
+ Set<SessionInfo> result = new HashSet<>();
+ for (HostStreamingData data : peerSessions.values())
+ {
+ result.addAll(data.getAllSessionInfo());
+ }
+ return result;
+ }
+
+ public synchronized void transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+ {
+ HostStreamingData sessionList = getOrCreateHostData(to);
+
+ if (connectionsPerHost > 1)
+ {
+ List<List<StreamSession.SSTableStreamingSections>> buckets = sliceSSTableDetails(sstableDetails);
+
+ for (List<StreamSession.SSTableStreamingSections> subList : buckets)
+ {
+ StreamSession session = sessionList.getOrCreateNextSession(to);
+ session.addTransferFiles(subList);
+ }
+ }
+ else
+ {
+ StreamSession session = sessionList.getOrCreateNextSession(to);
+ session.addTransferFiles(sstableDetails);
+ }
+ }
+
+ private List<List<StreamSession.SSTableStreamingSections>> sliceSSTableDetails(Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+ {
+ // There's no point in divvying things up into more buckets than we have sstableDetails
+ int targetSlices = Math.min(sstableDetails.size(), connectionsPerHost);
+ int step = Math.round((float) sstableDetails.size() / (float) targetSlices);
+ int index = 0;
+
+ List<List<StreamSession.SSTableStreamingSections>> result = new ArrayList<>();
+ List<StreamSession.SSTableStreamingSections> slice = null;
+ for (StreamSession.SSTableStreamingSections streamSession : sstableDetails)
+ {
+ if (index % step == 0)
+ {
+ slice = new ArrayList<>();
+ result.add(slice);
+ }
+ slice.add(streamSession);
+ ++index;
+ }
+
+ return result;
+ }
+
+ private HostStreamingData getHostData(InetAddress peer)
+ {
+ HostStreamingData data = peerSessions.get(peer);
+ if (data == null)
+ throw new IllegalArgumentException("Unknown peer requested: " + peer.toString());
+ return data;
+ }
+
+ private HostStreamingData getOrCreateHostData(InetAddress peer)
+ {
+ HostStreamingData data = peerSessions.get(peer);
+ if (data == null)
+ {
+ data = new HostStreamingData();
+ peerSessions.put(peer, data);
+ }
+ return data;
+ }
+
+ private class StreamSessionConnector implements Runnable
+ {
+ private final StreamSession session;
+ public StreamSessionConnector(StreamSession session)
+ {
+ this.session = session;
+ }
+
+ public void run()
+ {
+ session.start();
+ logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", session.planId(), session.sessionIndex(), session.peer);
+ }
+ }
+
+ private class HostStreamingData
+ {
+ private Map<Integer, StreamSession> streamSessions = new HashMap<>();
+ private Map<Integer, SessionInfo> sessionInfos = new HashMap<>();
+
+ private int lastReturned = -1;
+
+ public boolean hasActiveSessions()
+ {
+ for (StreamSession session : streamSessions.values())
+ {
+ StreamSession.State state = session.state();
+ if (state != StreamSession.State.COMPLETE && state != StreamSession.State.FAILED)
+ return true;
+ }
+ return false;
+ }
+
+ public StreamSession getOrCreateNextSession(InetAddress peer)
+ {
+ // create
+ if (streamSessions.size() < connectionsPerHost)
+ {
+ StreamSession session = new StreamSession(peer, streamSessions.size());
+ streamSessions.put(++lastReturned, session);
+ return session;
+ }
+ // get
+ else
+ {
+ if (lastReturned == streamSessions.size() - 1)
+ lastReturned = 0;
+
+ return streamSessions.get(lastReturned++);
+ }
+ }
+
+ public void connectAllStreamSessions()
+ {
+ for (StreamSession session : streamSessions.values())
+ {
+ streamExecutor.execute(new StreamSessionConnector(session));
+ }
+ }
+
+ public Collection<StreamSession> getAllStreamSessions()
+ {
+ return Collections.unmodifiableCollection(streamSessions.values());
+ }
+
+ public StreamSession getOrCreateSessionById(InetAddress peer, int id)
+ {
+ StreamSession session = streamSessions.get(id);
+ if (session == null)
+ {
+ session = new StreamSession(peer, id);
+ streamSessions.put(id, session);
+ }
+ return session;
+ }
+
+ public void updateProgress(ProgressInfo info)
+ {
+ sessionInfos.get(info.sessionIndex).updateProgress(info);
+ }
+
+ public void addSessionInfo(SessionInfo info)
+ {
+ sessionInfos.put(info.sessionIndex, info);
+ }
+
+ public Collection<SessionInfo> getAllSessionInfo()
+ {
+ return sessionInfos.values();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java b/src/java/org/apache/cassandra/streaming/StreamEvent.java
index 9af1fbd..8089323 100644
--- a/src/java/org/apache/cassandra/streaming/StreamEvent.java
+++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java
@@ -42,12 +42,14 @@ public abstract class StreamEvent
{
public final InetAddress peer;
public final boolean success;
+ public final int sessionIndex;
public SessionCompleteEvent(StreamSession session)
{
super(Type.STREAM_COMPLETE, session.planId());
this.peer = session.peer;
this.success = session.isSuccess();
+ this.sessionIndex = session.sessionIndex();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 04bd7df..e775c90 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -35,10 +35,8 @@ public class StreamPlan
private final UUID planId = UUIDGen.getTimeUUID();
private final String description;
private final List<StreamEventHandler> handlers = new ArrayList<>();
-
- // sessions per InetAddress of the other end.
- private final Map<InetAddress, StreamSession> sessions = new HashMap<>();
private final long repairedAt;
+ private final StreamCoordinator coordinator;
private boolean flushBeforeTransfer = true;
@@ -49,16 +47,16 @@ public class StreamPlan
*/
public StreamPlan(String description)
{
- this(description, ActiveRepairService.UNREPAIRED_SSTABLE);
+ this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1);
}
- public StreamPlan(String description, long repairedAt)
+ public StreamPlan(String description, long repairedAt, int connectionsPerHost)
{
this.description = description;
this.repairedAt = repairedAt;
+ this.coordinator = new StreamCoordinator(connectionsPerHost);
}
-
/**
* Request data in {@code keyspace} and {@code ranges} from specific node.
*
@@ -83,7 +81,7 @@ public class StreamPlan
*/
public StreamPlan requestRanges(InetAddress from, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
{
- StreamSession session = getOrCreateSession(from);
+ StreamSession session = coordinator.getOrCreateNextSession(from);
session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies), repairedAt);
return this;
}
@@ -112,7 +110,7 @@ public class StreamPlan
*/
public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
{
- StreamSession session = getOrCreateSession(to);
+ StreamSession session = coordinator.getOrCreateNextSession(to);
session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer, repairedAt);
return this;
}
@@ -127,9 +125,9 @@ public class StreamPlan
*/
public StreamPlan transferFiles(InetAddress to, Collection<StreamSession.SSTableStreamingSections> sstableDetails)
{
- StreamSession session = getOrCreateSession(to);
- session.addTransferFiles(sstableDetails);
+ coordinator.transferFiles(to, sstableDetails);
return this;
+
}
public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers)
@@ -145,7 +143,7 @@ public class StreamPlan
*/
public boolean isEmpty()
{
- return sessions.isEmpty();
+ return coordinator.hasActiveSessions();
}
/**
@@ -155,7 +153,7 @@ public class StreamPlan
*/
public StreamResultFuture execute()
{
- return StreamResultFuture.init(planId, description, sessions.values(), handlers);
+ return StreamResultFuture.init(planId, description, handlers, coordinator);
}
/**
@@ -170,16 +168,4 @@ public class StreamPlan
this.flushBeforeTransfer = flushBeforeTransfer;
return this;
}
-
- private StreamSession getOrCreateSession(InetAddress peer)
- {
- StreamSession session = sessions.get(peer);
- if (session == null)
- {
- session = new StreamSession(peer);
- sessions.put(peer, session);
- }
- return session;
- }
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index dcffaff..c04c3f1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -23,10 +23,8 @@ import java.net.Socket;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
-import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,11 +47,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
public final UUID planId;
public final String description;
+ private final StreamCoordinator coordinator;
private final Collection<StreamEventHandler> eventListeners = new ConcurrentLinkedQueue<>();
- private final Map<InetAddress, StreamSession> ongoingSessions;
- private final Map<InetAddress, SessionInfo> sessionStates = new NonBlockingHashMap<>();
-
/**
* Create new StreamResult of given {@code planId} and type.
*
@@ -62,22 +58,25 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
* @param planId Stream plan ID
* @param description Stream description
*/
- private StreamResultFuture(UUID planId, String description, Collection<StreamSession> sessions)
+ private StreamResultFuture(UUID planId, String description, StreamCoordinator coordinator)
{
this.planId = planId;
this.description = description;
- this.ongoingSessions = new HashMap<>(sessions.size());
- for (StreamSession session : sessions)
- this.ongoingSessions.put(session.peer, session);
+ this.coordinator = coordinator;
// if there is no session to listen to, we immediately set result for returning
- if (sessions.isEmpty())
+ if (!coordinator.hasActiveSessions())
set(getCurrentState());
}
- static StreamResultFuture init(UUID planId, String description, Collection<StreamSession> sessions, Collection<StreamEventHandler> listeners)
+ private StreamResultFuture(UUID planId, String description)
+ {
+ this(planId, description, new StreamCoordinator(0));
+ }
+
+ static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners, StreamCoordinator coordinator)
{
- StreamResultFuture future = createAndRegister(planId, description, sessions);
+ StreamResultFuture future = createAndRegister(planId, description, coordinator);
if (listeners != null)
{
for (StreamEventHandler listener : listeners)
@@ -85,18 +84,19 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
}
logger.info("[Stream #{}] Executing streaming plan for {}", planId, description);
- // start sessions
- for (final StreamSession session : sessions)
+
+ // Initialize and start all sessions
+ for (final StreamSession session : coordinator.getAllStreamSessions())
{
- logger.info("[Stream #{}] Beginning stream session with {}", planId, session.peer);
session.init(future);
- session.start();
}
+ coordinator.connectAllStreamSessions();
return future;
}
- public static synchronized StreamResultFuture initReceivingSide(UUID planId,
+ public static synchronized StreamResultFuture initReceivingSide(int sessionIndex,
+ UUID planId,
String description,
InetAddress from,
Socket socket,
@@ -106,35 +106,28 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
if (future == null)
{
- final StreamSession session = new StreamSession(from);
+ logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description);
// The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
- future = new StreamResultFuture(planId, description, Collections.singleton(session));
+ future = new StreamResultFuture(planId, description);
StreamManager.instance.registerReceiving(future);
-
- session.init(future);
- session.handler.initiateOnReceivingSide(socket, isForOutgoing, version);
- }
- else
- {
- future.attachSocket(from, socket, isForOutgoing, version);
- logger.info("[Stream #{}] Received streaming plan for {}", planId, description);
}
+ future.attachSocket(from, sessionIndex, socket, isForOutgoing, version);
+ logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, description);
return future;
}
- private static StreamResultFuture createAndRegister(UUID planId, String description, Collection<StreamSession> sessions)
+ private static StreamResultFuture createAndRegister(UUID planId, String description, StreamCoordinator coordinator)
{
- StreamResultFuture future = new StreamResultFuture(planId, description, sessions);
+ StreamResultFuture future = new StreamResultFuture(planId, description, coordinator);
StreamManager.instance.register(future);
return future;
}
- public void attachSocket(InetAddress from, Socket socket, boolean isForOutgoing, int version) throws IOException
+ private void attachSocket(InetAddress from, int sessionIndex, Socket socket, boolean isForOutgoing, int version) throws IOException
{
- StreamSession session = ongoingSessions.get(from);
- if (session == null)
- throw new RuntimeException(String.format("Got connection from %s for stream session %s but no such session locally", from, planId));
+ StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex);
+ session.init(this);
session.handler.initiateOnReceivingSide(socket, isForOutgoing, version);
}
@@ -149,7 +142,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
*/
public StreamState getCurrentState()
{
- return new StreamState(planId, description, ImmutableSet.copyOf(sessionStates.values()));
+ return new StreamState(planId, description, coordinator.getAllSessionInfo());
}
@Override
@@ -170,44 +163,41 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
void handleSessionPrepared(StreamSession session)
{
SessionInfo sessionInfo = session.getSessionInfo();
- logger.info("[Stream #{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)",
+ logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)",
session.planId(),
+ session.sessionIndex(),
sessionInfo.getTotalFilesToReceive(),
sessionInfo.getTotalSizeToReceive(),
sessionInfo.getTotalFilesToSend(),
sessionInfo.getTotalSizeToSend());
StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(planId, sessionInfo);
- sessionStates.put(sessionInfo.peer, sessionInfo);
+ coordinator.addSessionInfo(sessionInfo);
fireStreamEvent(event);
}
void handleSessionComplete(StreamSession session)
{
logger.info("[Stream #{}] Session with {} is complete", session.planId(), session.peer);
-
- SessionInfo sessionInfo = session.getSessionInfo();
- sessionStates.put(sessionInfo.peer, sessionInfo);
fireStreamEvent(new StreamEvent.SessionCompleteEvent(session));
- maybeComplete(session);
+ maybeComplete();
}
public void handleProgress(ProgressInfo progress)
{
- sessionStates.get(progress.peer).updateProgress(progress);
+ coordinator.updateProgress(progress);
fireStreamEvent(new StreamEvent.ProgressEvent(planId, progress));
}
- void fireStreamEvent(StreamEvent event)
+ synchronized void fireStreamEvent(StreamEvent event)
{
// delegate to listener
for (StreamEventHandler listener : eventListeners)
listener.handleStreamEvent(event);
}
- private synchronized void maybeComplete(StreamSession session)
+ private synchronized void maybeComplete()
{
- ongoingSessions.remove(session.peer);
- if (ongoingSessions.isEmpty())
+ if (!coordinator.hasActiveSessions())
{
StreamState finalState = getCurrentState();
if (finalState.hasFailedSession())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index c5f4cf9..a1c571b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
@@ -113,12 +112,8 @@ import org.apache.cassandra.utils.Pair;
public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
{
private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
-
- // Executor that establish the streaming connection. Once we're connected to the other end, the rest of the streaming
- // is directly handled by the ConnectionHandler incoming and outgoing threads.
- private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
- FBUtilities.getAvailableProcessors());
public final InetAddress peer;
+ private final int index;
// should not be null when session is started
private StreamResultFuture streamResult;
@@ -153,9 +148,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
*
* @param peer Address of streaming peer
*/
- public StreamSession(InetAddress peer)
+ public StreamSession(InetAddress peer, int index)
{
this.peer = peer;
+ this.index = index;
this.handler = new ConnectionHandler(this);
this.metrics = StreamingMetrics.get(peer);
}
@@ -165,6 +161,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
return streamResult == null ? null : streamResult.planId;
}
+ public int sessionIndex()
+ {
+ return index;
+ }
+
public String description()
{
return streamResult == null ? null : streamResult.description;
@@ -194,21 +195,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
return;
}
- streamExecutor.execute(new Runnable()
+ try
{
- public void run()
- {
- try
- {
- handler.initiate();
- onInitializationComplete();
- }
- catch (IOException e)
- {
- onError(e);
- }
- }
- });
+ logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", planId(), sessionIndex(), peer);
+ handler.initiate();
+ onInitializationComplete();
+ }
+ catch (Exception e)
+ {
+ onError(e);
+ }
}
/**
@@ -519,7 +515,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
public void progress(Descriptor desc, ProgressInfo.Direction direction, long bytes, long total)
{
- ProgressInfo progress = new ProgressInfo(peer, desc.filenameFor(Component.DATA), direction, bytes, total);
+ ProgressInfo progress = new ProgressInfo(peer, index, desc.filenameFor(Component.DATA), direction, bytes, total);
streamResult.handleProgress(progress);
}
@@ -590,7 +586,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
List<StreamSummary> transferSummaries = Lists.newArrayList();
for (StreamTask transfer : transfers.values())
transferSummaries.add(transfer.getSummary());
- return new SessionInfo(peer, receivingSummaries, transferSummaries, state);
+ return new SessionInfo(peer, index, receivingSummaries, transferSummaries, state);
}
public synchronized void taskCompleted(StreamReceiveTask completedTask)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
index b361b1b..a54498d 100644
--- a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
@@ -32,18 +32,21 @@ public class ProgressInfoCompositeData
{
private static final String[] ITEM_NAMES = new String[]{"planId",
"peer",
+ "sessionIndex",
"fileName",
"direction",
"currentBytes",
"totalBytes"};
private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID",
"Session peer",
+ "Index of session",
"Name of the file",
"Direction('IN' or 'OUT')",
"Current bytes transferred",
"Total bytes to transfer"};
private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
SimpleType.STRING,
+ SimpleType.INTEGER,
SimpleType.STRING,
SimpleType.STRING,
SimpleType.LONG,
@@ -70,10 +73,11 @@ public class ProgressInfoCompositeData
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(ITEM_NAMES[0], planId.toString());
valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress());
- valueMap.put(ITEM_NAMES[2], progressInfo.fileName);
- valueMap.put(ITEM_NAMES[3], progressInfo.direction.name());
- valueMap.put(ITEM_NAMES[4], progressInfo.currentBytes);
- valueMap.put(ITEM_NAMES[5], progressInfo.totalBytes);
+ valueMap.put(ITEM_NAMES[2], progressInfo.sessionIndex);
+ valueMap.put(ITEM_NAMES[3], progressInfo.fileName);
+ valueMap.put(ITEM_NAMES[4], progressInfo.direction.name());
+ valueMap.put(ITEM_NAMES[5], progressInfo.currentBytes);
+ valueMap.put(ITEM_NAMES[6], progressInfo.totalBytes);
try
{
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
@@ -90,10 +94,11 @@ public class ProgressInfoCompositeData
try
{
return new ProgressInfo(InetAddress.getByName((String) values[1]),
- (String) values[2],
- ProgressInfo.Direction.valueOf((String)values[3]),
- (long) values[4],
- (long) values[5]);
+ (int) values[2],
+ (String) values[3],
+ ProgressInfo.Direction.valueOf((String)values[4]),
+ (long) values[5],
+ (long) values[6]);
}
catch (UnknownHostException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
index 658facf..8618cca 100644
--- a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
+++ b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
@@ -129,6 +129,7 @@ public class SessionInfoCompositeData
}
};
SessionInfo info = new SessionInfo(peer,
+ (int)values[2],
fromArrayOfCompositeData((CompositeData[]) values[2], toStreamSummary),
fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
StreamSession.State.valueOf((String) values[4]));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 947e5d5..a9ec4ae 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -40,15 +40,17 @@ public class StreamInitMessage
public static IVersionedSerializer<StreamInitMessage> serializer = new StreamInitMessageSerializer();
public final InetAddress from;
+ public final int sessionIndex;
public final UUID planId;
public final String description;
// true if this init message is to connect for outgoing message on receiving side
public final boolean isForOutgoing;
- public StreamInitMessage(InetAddress from, UUID planId, String description, boolean isForOutgoing)
+ public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing)
{
this.from = from;
+ this.sessionIndex = sessionIndex;
this.planId = planId;
this.description = description;
this.isForOutgoing = isForOutgoing;
@@ -99,6 +101,7 @@ public class StreamInitMessage
public void serialize(StreamInitMessage message, DataOutputPlus out, int version) throws IOException
{
CompactEndpointSerializationHelper.serialize(message.from, out);
+ out.writeInt(message.sessionIndex);
UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version);
out.writeUTF(message.description);
out.writeBoolean(message.isForOutgoing);
@@ -107,15 +110,17 @@ public class StreamInitMessage
public StreamInitMessage deserialize(DataInput in, int version) throws IOException
{
InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
+ int sessionIndex = in.readInt();
UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
String description = in.readUTF();
boolean sentByInitiator = in.readBoolean();
- return new StreamInitMessage(from, planId, description, sentByInitiator);
+ return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator);
}
public long serializedSize(StreamInitMessage message, int version)
{
long size = CompactEndpointSerializationHelper.serializedSize(message.from);
+ size += TypeSizes.NATIVE.sizeof(message.sessionIndex);
size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version);
size += TypeSizes.NATIVE.sizeof(message.description);
size += TypeSizes.NATIVE.sizeof(message.isForOutgoing);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 50c3911..360b59e 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.streaming.StreamSession;
public abstract class StreamMessage
{
/** Streaming protocol version */
- public static final int CURRENT_VERSION = 1;
+ public static final int CURRENT_VERSION = 2;
public static void serialize(StreamMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 01952e5..6042f92 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -21,12 +21,10 @@ import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import com.google.common.base.Joiner;
-import com.google.common.collect.Sets;
-import org.apache.cassandra.config.EncryptionOptions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import org.apache.commons.cli.*;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
@@ -35,6 +33,7 @@ import org.apache.thrift.transport.TTransport;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.marshal.UTF8Type;
@@ -50,7 +49,6 @@ public class BulkLoader
{
private static final String TOOL_NAME = "sstableloader";
private static final String VERBOSE_OPTION = "verbose";
- private static final String DEBUG_OPTION = "debug";
private static final String HELP_OPTION = "help";
private static final String NOPROGRESS_OPTION = "no-progress";
private static final String IGNORE_NODES_OPTION = "ignore";
@@ -68,47 +66,63 @@ public class BulkLoader
private static final String SSL_ALGORITHM = "ssl-alg";
private static final String SSL_STORE_TYPE = "store-type";
private static final String SSL_CIPHER_SUITES = "ssl-ciphers";
+ private static final String CONNECTIONS_PER_HOST = "connections-per-host";
public static void main(String args[])
{
LoaderOptions options = LoaderOptions.parseArgs(args);
- OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
- SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts, options.rpcPort, options.user, options.passwd, options.transportFactory), handler);
+ OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, true);
+ SSTableLoader loader = new SSTableLoader(
+ options.directory,
+ new ExternalClient(
+ options.hosts,
+ options.rpcPort,
+ options.user,
+ options.passwd,
+ options.transportFactory),
+ handler,
+ options.connectionsPerHost);
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
StreamResultFuture future = null;
+
+ ProgressIndicator indicator = new ProgressIndicator();
try
{
if (options.noProgress)
+ {
future = loader.stream(options.ignores);
+ }
else
- future = loader.stream(options.ignores, new ProgressIndicator());
+ {
+ future = loader.stream(options.ignores, indicator);
+ }
+
}
catch (Exception e)
{
System.err.println(e.getMessage());
if (e.getCause() != null)
System.err.println(e.getCause());
- if (options.debug)
- e.printStackTrace(System.err);
- else
- System.err.println("Run with --debug to get full stack trace or --help to get help.");
+ e.printStackTrace(System.err);
System.exit(1);
}
- handler.output(String.format("Streaming session ID: %s", future.planId));
-
try
{
future.get();
+
+ if (!options.noProgress)
+ indicator.printSummary(options.connectionsPerHost);
+
+ // Give sockets time to gracefully close
+ Thread.sleep(1000);
System.exit(0); // We need that to stop non daemonized threads
}
catch (Exception e)
{
System.err.println("Streaming to the following hosts failed:");
System.err.println(loader.getFailedHosts());
- System.err.println(e);
- if (options.debug)
- e.printStackTrace(System.err);
+ e.printStackTrace(System.err);
System.exit(1);
}
}
@@ -116,13 +130,15 @@ public class BulkLoader
// Return true when everything is at 100%
static class ProgressIndicator implements StreamEventHandler
{
- private final Map<InetAddress, SessionInfo> sessionsByHost = new ConcurrentHashMap<>();
- private final Map<InetAddress, Set<ProgressInfo>> progressByHost = new ConcurrentHashMap<>();
-
private long start;
private long lastProgress;
private long lastTime;
+ private int peak = 0;
+ private int totalFiles = 0;
+
+ private final Multimap<InetAddress, SessionInfo> sessionsByHost = HashMultimap.create();
+
public ProgressIndicator()
{
start = lastTime = System.nanoTime();
@@ -131,70 +147,100 @@ public class BulkLoader
public void onSuccess(StreamState finalState) {}
public void onFailure(Throwable t) {}
- public void handleStreamEvent(StreamEvent event)
+ public synchronized void handleStreamEvent(StreamEvent event)
{
if (event.eventType == StreamEvent.Type.STREAM_PREPARED)
{
SessionInfo session = ((StreamEvent.SessionPreparedEvent) event).session;
sessionsByHost.put(session.peer, session);
}
- else if (event.eventType == StreamEvent.Type.FILE_PROGRESS)
+ else if (event.eventType == StreamEvent.Type.FILE_PROGRESS || event.eventType == StreamEvent.Type.STREAM_COMPLETE)
{
- ProgressInfo progressInfo = ((StreamEvent.ProgressEvent) event).progress;
-
- // update progress
- Set<ProgressInfo> progresses = progressByHost.get(progressInfo.peer);
- if (progresses == null)
+ ProgressInfo progressInfo = null;
+ if (event.eventType == StreamEvent.Type.FILE_PROGRESS)
{
- progresses = Sets.newSetFromMap(new ConcurrentHashMap<ProgressInfo, Boolean>());
- progressByHost.put(progressInfo.peer, progresses);
+ progressInfo = ((StreamEvent.ProgressEvent) event).progress;
}
- if (progresses.contains(progressInfo))
- progresses.remove(progressInfo);
- progresses.add(progressInfo);
+
+ long time = System.nanoTime();
+ long deltaTime = time - lastTime;
StringBuilder sb = new StringBuilder();
sb.append("\rprogress: ");
long totalProgress = 0;
long totalSize = 0;
- for (Map.Entry<InetAddress, Set<ProgressInfo>> entry : progressByHost.entrySet())
+
+ boolean updateTotalFiles = totalFiles == 0;
+ // recalculate progress across all sessions in all hosts and display
+ for (InetAddress peer : sessionsByHost.keySet())
{
- SessionInfo session = sessionsByHost.get(entry.getKey());
+ sb.append("[").append(peer.toString()).append("]");
- long size = session.getTotalSizeToSend();
- long current = 0;
- int completed = 0;
- for (ProgressInfo progress : entry.getValue())
+ for (SessionInfo session : sessionsByHost.get(peer))
{
- if (progress.currentBytes == progress.totalBytes)
- completed++;
- current += progress.currentBytes;
+ long size = session.getTotalSizeToSend();
+ long current = 0;
+ int completed = 0;
+
+ if (progressInfo != null && session.peer.equals(progressInfo.peer) && (session.sessionIndex == progressInfo.sessionIndex))
+ {
+ session.updateProgress(progressInfo);
+ }
+ for (ProgressInfo progress : session.getSendingFiles())
+ {
+ if (progress.isCompleted())
+ completed++;
+ current += progress.currentBytes;
+ }
+ totalProgress += current;
+
+ totalSize += size;
+
+ sb.append(session.sessionIndex).append(":");
+ sb.append(completed).append("/").append(session.getTotalFilesToSend());
+ sb.append(" ").append(String.format("%-3d", size == 0 ? 100L : current * 100L / size)).append("% ");
+
+ if (updateTotalFiles)
+ totalFiles += session.getTotalFilesToSend();
}
- totalProgress += current;
- totalSize += size;
- sb.append("[").append(entry.getKey());
- sb.append(" ").append(completed).append("/").append(session.getTotalFilesToSend());
- sb.append(" (").append(size == 0 ? 100L : current * 100L / size).append("%)] ");
}
- long time = System.nanoTime();
- long deltaTime = TimeUnit.NANOSECONDS.toMillis(time - lastTime);
+
lastTime = time;
long deltaProgress = totalProgress - lastProgress;
lastProgress = totalProgress;
- sb.append("[total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append("% - ");
- sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s");
- sb.append(" (avg: ").append(mbPerSec(totalProgress, TimeUnit.NANOSECONDS.toMillis(time - start))).append("MB/s)]");
+ sb.append("total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append("% ");
+ sb.append(String.format("%-3d", mbPerSec(deltaProgress, deltaTime))).append("MB/s");
+ int average = mbPerSec(totalProgress, (time - start));
+ if (average > peak)
+ peak = average;
+ sb.append("(avg: ").append(average).append(" MB/s)");
- System.out.print(sb.toString());
+ System.err.print(sb.toString());
}
}
- private int mbPerSec(long bytes, long timeInMs)
+ private int mbPerSec(long bytes, long timeInNano)
{
- double bytesPerMs = ((double)bytes) / timeInMs;
- return (int)((bytesPerMs * 1000) / (1024 * 2024));
+ double bytesPerNano = ((double)bytes) / timeInNano;
+ return (int)((bytesPerNano * 1000 * 1000 * 1000) / (1024 * 2024));
+ }
+
+ private void printSummary(int connectionsPerHost)
+ {
+ long end = System.nanoTime();
+ long durationMS = ((end - start) / (1000000));
+ int average = mbPerSec(lastProgress, (end - start));
+ StringBuilder sb = new StringBuilder();
+ sb.append("\nSummary statistics: \n");
+ sb.append(String.format(" %-30s: %-10d\n", "Connections per host: ", connectionsPerHost));
+ sb.append(String.format(" %-30s: %-10d\n", "Total files transferred: ", totalFiles));
+ sb.append(String.format(" %-30s: %-10d\n", "Total bytes transferred: ", lastProgress));
+ sb.append(String.format(" %-30s: %-10d\n", "Total duration (ms): ", durationMS));
+ sb.append(String.format(" %-30s: %-10d\n", "Average transfer rate (MB/s): ", + average));
+ sb.append(String.format(" %-30s: %-10d\n", "Peak transfer rate (MB/s): ", + peak));
+ System.err.println(sb.toString());
}
}
@@ -282,7 +328,7 @@ public class BulkLoader
Cassandra.Client client = new Cassandra.Client(protocol);
if (user != null && passwd != null)
{
- Map<String, String> credentials = new HashMap<String, String>();
+ Map<String, String> credentials = new HashMap<>();
credentials.put(IAuthenticator.USERNAME_KEY, user);
credentials.put(IAuthenticator.PASSWORD_KEY, passwd);
AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
@@ -305,9 +351,10 @@ public class BulkLoader
public int throttle = 0;
public ITransportFactory transportFactory = new TFramedTransportFactory();
public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions();
+ public int connectionsPerHost = 1;
- public final Set<InetAddress> hosts = new HashSet<InetAddress>();
- public final Set<InetAddress> ignores = new HashSet<InetAddress>();
+ public final Set<InetAddress> hosts = new HashSet<>();
+ public final Set<InetAddress> ignores = new HashSet<>();
LoaderOptions(File directory)
{
@@ -354,7 +401,6 @@ public class BulkLoader
LoaderOptions opts = new LoaderOptions(dir);
- opts.debug = cmd.hasOption(DEBUG_OPTION);
opts.verbose = cmd.hasOption(VERBOSE_OPTION);
opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION);
@@ -409,6 +455,9 @@ public class BulkLoader
}
}
+ if (cmd.hasOption(CONNECTIONS_PER_HOST))
+ opts.connectionsPerHost = Integer.parseInt(cmd.getOptionValue(CONNECTIONS_PER_HOST));
+
if(cmd.hasOption(SSL_TRUSTSTORE))
{
opts.encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE);
@@ -520,7 +569,6 @@ public class BulkLoader
private static CmdLineOptions getCmdLineOptions()
{
CmdLineOptions options = new CmdLineOptions();
- options.addOption(null, DEBUG_OPTION, "display stack traces");
options.addOption("v", VERBOSE_OPTION, "verbose output");
options.addOption("h", HELP_OPTION, "display this help message");
options.addOption(null, NOPROGRESS_OPTION, "don't display progress");
@@ -531,6 +579,7 @@ public class BulkLoader
options.addOption("u", USER_OPTION, "username", "username for cassandra authentication");
options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication");
options.addOption("tf", TRANSPORT_FACTORY, "transport factory", "Fully-qualified ITransportFactory class name for creating a connection to cassandra");
+ options.addOption("cph", CONNECTIONS_PER_HOST, "connectionsPerHost", "number of concurrent connections-per-host.");
// ssl connection-related options
options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "SSL: full path to truststore");
options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "SSL: password of the truststore");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
index 60fbf40..f015a01 100644
--- a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
@@ -46,7 +46,7 @@ public class SessionInfoTest
}
StreamSummary sending = new StreamSummary(cfId, 10, 100);
- SessionInfo info = new SessionInfo(local, summaries, Collections.singleton(sending), StreamSession.State.PREPARING);
+ SessionInfo info = new SessionInfo(local, 0, summaries, Collections.singleton(sending), StreamSession.State.PREPARING);
assert info.getTotalFilesToReceive() == 45;
assert info.getTotalFilesToSend() == 10;
@@ -57,13 +57,13 @@ public class SessionInfoTest
assert info.getTotalFilesSent() == 0;
// receive in progress
- info.updateProgress(new ProgressInfo(local, "test.txt", ProgressInfo.Direction.IN, 50, 100));
+ info.updateProgress(new ProgressInfo(local, 0, "test.txt", ProgressInfo.Direction.IN, 50, 100));
// still in progress, but not completed yet
assert info.getTotalSizeReceived() == 50;
assert info.getTotalSizeSent() == 0;
assert info.getTotalFilesReceived() == 0;
assert info.getTotalFilesSent() == 0;
- info.updateProgress(new ProgressInfo(local, "test.txt", ProgressInfo.Direction.IN, 100, 100));
+ info.updateProgress(new ProgressInfo(local, 0, "test.txt", ProgressInfo.Direction.IN, 100, 100));
// 1 file should be completed
assert info.getTotalSizeReceived() == 100;
assert info.getTotalSizeSent() == 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44fa2cdb/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index e22fa8f..c6c04b0 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -43,7 +43,7 @@ public class StreamTransferTaskTest extends SchemaLoader
String ks = "Keyspace1";
String cf = "Standard1";
- StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress());
+ StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress(), 0);
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
// create two sstables
[3/3] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ed57bf5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ed57bf5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ed57bf5
Branch: refs/heads/trunk
Commit: 0ed57bf5317ad259f39f4d3ff97a651d5c18d7cc
Parents: f0def34 44fa2cd
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu May 1 17:51:54 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu May 1 17:51:54 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableLoader.java | 9 +-
.../net/IncomingStreamingConnection.java | 2 +-
.../cassandra/repair/StreamingRepairTask.java | 2 +-
.../cassandra/streaming/ConnectionHandler.java | 7 +-
.../cassandra/streaming/ProgressInfo.java | 10 +-
.../apache/cassandra/streaming/SessionInfo.java | 3 +
.../cassandra/streaming/StreamCoordinator.java | 272 +++++++++++++++++++
.../apache/cassandra/streaming/StreamEvent.java | 2 +
.../apache/cassandra/streaming/StreamPlan.java | 34 +--
.../cassandra/streaming/StreamResultFuture.java | 80 +++---
.../cassandra/streaming/StreamSession.java | 42 ++-
.../management/ProgressInfoCompositeData.java | 21 +-
.../management/SessionInfoCompositeData.java | 1 +
.../streaming/messages/StreamInitMessage.java | 9 +-
.../streaming/messages/StreamMessage.java | 2 +-
.../org/apache/cassandra/tools/BulkLoader.java | 169 ++++++++----
.../cassandra/streaming/SessionInfoTest.java | 6 +-
.../streaming/StreamTransferTaskTest.java | 2 +-
19 files changed, 500 insertions(+), 174 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ed57bf5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0c32f3c,86940ea..5ec4c89
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,5 +1,13 @@@
+3.0
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7208)
+
2.1.0-rc1
+ * Parallel streaming for sstableloader (CASSANDRA-3668)
Merged from 2.0:
* Add Google Compute Engine snitch (CASSANDRA-7132)