You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/20 19:07:17 UTC
[5/5] git commit: Streaming 2.0
Streaming 2.0
patch by Yukim; reviewed by slebresne for CASSANDRA-5286
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/51511697
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/51511697
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/51511697
Branch: refs/heads/trunk
Commit: 51511697254615b570f4162bbcd2baee23a234e9
Parents: 40b6c5d
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue May 28 20:28:13 2013 -0500
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Jun 20 19:05:24 2013 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/dht/BootStrapper.java | 19 +-
.../org/apache/cassandra/dht/RangeStreamer.java | 69 +--
.../cassandra/hadoop/BulkRecordWriter.java | 15 +-
.../cassandra/io/sstable/SSTableLoader.java | 145 ++---
.../net/IncomingStreamingConnection.java | 76 +++
.../cassandra/net/IncomingTcpConnection.java | 54 +-
.../apache/cassandra/net/MessagingService.java | 102 +---
.../cassandra/service/StorageService.java | 273 +++------
.../streaming/AbstractStreamSession.java | 114 ----
.../cassandra/streaming/ConnectionHandler.java | 309 ++++++++++
.../cassandra/streaming/FileStreamTask.java | 291 ---------
.../cassandra/streaming/IStreamCallback.java | 36 --
.../streaming/IncomingStreamReader.java | 200 -------
.../cassandra/streaming/OperationType.java | 32 -
.../apache/cassandra/streaming/PendingFile.java | 180 ------
.../cassandra/streaming/ProgressInfo.java | 110 ++++
.../apache/cassandra/streaming/SessionInfo.java | 187 ++++++
.../apache/cassandra/streaming/StreamEvent.java | 81 +++
.../cassandra/streaming/StreamEventHandler.java | 31 +
.../cassandra/streaming/StreamException.java | 35 ++
.../cassandra/streaming/StreamHeader.java | 94 ---
.../apache/cassandra/streaming/StreamIn.java | 89 ---
.../cassandra/streaming/StreamInSession.java | 251 --------
.../cassandra/streaming/StreamManager.java | 94 +++
.../cassandra/streaming/StreamManagerMBean.java | 32 +
.../apache/cassandra/streaming/StreamOut.java | 187 ------
.../cassandra/streaming/StreamOutSession.java | 164 -----
.../apache/cassandra/streaming/StreamPlan.java | 169 ++++++
.../cassandra/streaming/StreamReader.java | 120 ++++
.../cassandra/streaming/StreamReceiveTask.java | 95 +++
.../apache/cassandra/streaming/StreamReply.java | 91 ---
.../streaming/StreamReplyVerbHandler.java | 63 --
.../cassandra/streaming/StreamRequest.java | 174 ++----
.../streaming/StreamRequestVerbHandler.java | 47 --
.../cassandra/streaming/StreamResultFuture.java | 172 ++++++
.../cassandra/streaming/StreamSession.java | 591 +++++++++++++++++++
.../apache/cassandra/streaming/StreamState.java | 53 ++
.../cassandra/streaming/StreamSummary.java | 107 ++++
.../apache/cassandra/streaming/StreamTask.java | 55 ++
.../cassandra/streaming/StreamTransferTask.java | 83 +++
.../cassandra/streaming/StreamWriter.java | 158 +++++
.../streaming/StreamingRepairTask.java | 75 +--
.../cassandra/streaming/StreamingService.java | 110 ----
.../streaming/StreamingServiceMBean.java | 41 --
.../compress/CompressedFileStreamTask.java | 160 -----
.../compress/CompressedInputStream.java | 7 +-
.../compress/CompressedStreamReader.java | 112 ++++
.../compress/CompressedStreamWriter.java | 129 ++++
.../streaming/messages/CompleteMessage.java | 42 ++
.../streaming/messages/FileMessage.java | 106 ++++
.../streaming/messages/FileMessageHeader.java | 151 +++++
.../streaming/messages/PrepareMessage.java | 91 +++
.../streaming/messages/RetryMessage.java | 57 ++
.../messages/SessionFailedMessage.java | 42 ++
.../streaming/messages/StreamInitMessage.java | 115 ++++
.../streaming/messages/StreamMessage.java | 109 ++++
.../org/apache/cassandra/tools/BulkLoader.java | 151 ++---
.../org/apache/cassandra/tools/NodeCmd.java | 61 +-
.../org/apache/cassandra/tools/NodeProbe.java | 29 +-
.../apache/cassandra/dht/BootStrapperTest.java | 3 +-
.../apache/cassandra/service/RemoveTest.java | 38 +-
.../cassandra/streaming/BootstrapTest.java | 54 --
.../cassandra/streaming/SerializationsTest.java | 220 -------
.../cassandra/streaming/SessionInfoTest.java | 73 +++
.../apache/cassandra/streaming/StreamUtil.java | 46 --
.../streaming/StreamingTransferTest.java | 198 ++++---
.../compress/CompressedInputStreamTest.java | 5 +-
68 files changed, 4068 insertions(+), 3406 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7e1bf77..2b21d05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -60,6 +60,7 @@
* cqlsh: Add row count to SELECT output (CASSANDRA-5636)
* Include a timestamp with all read commands to determine column expiration
(CASSANDRA-5149)
+ * Streaming 2.0 (CASSANDRA-5286)
1.2.6
* Fix cross-DC mutation forwarding (CASSANDRA-5632)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 28615cf..55d82e1 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
+import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +37,6 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.OperationType;
public class BootStrapper
{
@@ -63,7 +63,7 @@ public class BootStrapper
if (logger.isDebugEnabled())
logger.debug("Beginning bootstrap process");
- RangeStreamer streamer = new RangeStreamer(tokenMetadata, address, OperationType.BOOTSTRAP);
+ RangeStreamer streamer = new RangeStreamer(tokenMetadata, address, "Bootstrap");
streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
for (String table : Schema.instance.getNonSystemTables())
@@ -72,8 +72,19 @@ public class BootStrapper
streamer.addRanges(table, strategy.getPendingAddressRanges(tokenMetadata, tokens, address));
}
- streamer.fetch();
- StorageService.instance.finishBootstrapping();
+ try
+ {
+ streamer.fetchAsync().get();
+ StorageService.instance.finishBootstrapping();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException("Interrupted while waiting on boostrap to complete. Bootstrap will have to be restarted.");
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException("Error during boostrap: " + e.getCause().getMessage(), e.getCause());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index a0e1a93..2196f4d 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -19,14 +19,10 @@ package org.apache.cassandra.dht;
import java.net.InetAddress;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-import org.apache.cassandra.streaming.IStreamCallback;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +34,9 @@ import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.streaming.OperationType;
-import org.apache.cassandra.streaming.StreamIn;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.utils.FBUtilities;
/**
* Assists in streaming ranges to a node.
@@ -50,12 +47,10 @@ public class RangeStreamer
private final TokenMetadata metadata;
private final InetAddress address;
- private final OperationType opType;
+ private final String description;
private final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = HashMultimap.create();
private final Set<ISourceFilter> sourceFilters = new HashSet<ISourceFilter>();
- // protected for testing.
- protected CountDownLatch latch;
- private Set<Range<Token>> completed = Collections.newSetFromMap(new ConcurrentHashMap<Range<Token>, Boolean>());
+ private final StreamPlan streamPlan;
/**
* A filter applied to sources to stream from when constructing a fetch map.
@@ -104,11 +99,12 @@ public class RangeStreamer
}
}
- public RangeStreamer(TokenMetadata metadata, InetAddress address, OperationType opType)
+ public RangeStreamer(TokenMetadata metadata, InetAddress address, String description)
{
this.metadata = metadata;
this.address = address;
- this.opType = opType;
+ this.description = description;
+ this.streamPlan = new StreamPlan(description);
}
public void addSourceFilter(ISourceFilter filter)
@@ -123,7 +119,7 @@ public class RangeStreamer
if (logger.isDebugEnabled())
{
for (Map.Entry<Range<Token>, InetAddress> entry: rangesForTable.entries())
- logger.debug(String.format("%s: range %s exists on %s", opType, entry.getKey(), entry.getValue()));
+ logger.debug(String.format("%s: range %s exists on %s", description, entry.getKey(), entry.getValue()));
}
for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : getRangeFetchMap(rangesForTable, sourceFilters).asMap().entrySet())
@@ -131,7 +127,7 @@ public class RangeStreamer
if (logger.isDebugEnabled())
{
for (Range r : entry.getValue())
- logger.debug(String.format("%s: range %s from source %s for table %s", opType, r, entry.getKey(), table));
+ logger.debug(String.format("%s: range %s from source %s for table %s", description, r, entry.getKey(), table));
}
toFetch.put(table, entry);
}
@@ -219,50 +215,19 @@ public class RangeStreamer
return toFetch;
}
- public void fetch()
+ public StreamResultFuture fetchAsync()
{
- latch = new CountDownLatch(toFetch.entries().size());
-
for (Map.Entry<String, Map.Entry<InetAddress, Collection<Range<Token>>>> entry : toFetch.entries())
{
- final String table = entry.getKey();
- final InetAddress source = entry.getValue().getKey();
- final Collection<Range<Token>> ranges = entry.getValue().getValue();
+ String keyspace = entry.getKey();
+ InetAddress source = entry.getValue().getKey();
+ Collection<Range<Token>> ranges = entry.getValue().getValue();
/* Send messages to respective folks to stream data over to me */
- IStreamCallback callback = new IStreamCallback()
- {
- public void onSuccess()
- {
- completed.addAll(ranges);
- latch.countDown();
- if (logger.isDebugEnabled())
- logger.debug(String.format("Removed %s/%s as a %s source; remaining is %s",
- source, table, opType, latch.getCount()));
- }
-
- public void onFailure()
- {
- latch.countDown();
- logger.warn("Streaming from " + source + " failed");
- }
- };
if (logger.isDebugEnabled())
- logger.debug("" + opType + "ing from " + source + " ranges " + StringUtils.join(ranges, ", "));
- StreamIn.requestRanges(source, table, ranges, callback, opType);
+ logger.debug("" + description + "ing from " + source + " ranges " + StringUtils.join(ranges, ", "));
+ streamPlan.requestRanges(source, keyspace, ranges);
}
- try
- {
- latch.await();
- for (Map.Entry<String, Map.Entry<InetAddress, Collection<Range<Token>>>> entry : toFetch.entries())
- {
- if (!completed.containsAll(entry.getValue().getValue()))
- throw new RuntimeException(String.format("Unable to fetch range %s for keyspace %s from any hosts", entry.getValue().getValue(), entry.getKey()));
- }
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
+ return streamPlan.execute();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index de6d2e0..704d19f 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -23,6 +23,8 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.net.UnknownHostException;
import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -38,6 +40,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
+import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.OutputHandler;
import org.apache.hadoop.conf.Configuration;
@@ -221,7 +224,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
if (writer != null)
{
writer.close();
- SSTableLoader.LoaderFuture future = loader.stream();
+ Future<StreamState> future = loader.stream();
while (true)
{
try
@@ -229,7 +232,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
future.get(1000, TimeUnit.MILLISECONDS);
break;
}
- catch (TimeoutException te)
+ catch (ExecutionException | TimeoutException te)
{
progress.progress();
}
@@ -238,12 +241,12 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
throw new IOException(e);
}
}
- if (future.hadFailures())
+ if (loader.getFailedHosts().size() > 0)
{
- if (future.getFailedHosts().size() > maxFailures)
- throw new IOException("Too many hosts failed: " + future.getFailedHosts());
+ if (loader.getFailedHosts().size() > maxFailures)
+ throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
else
- logger.warn("Some hosts failed: " + future.getFailedHosts());
+ logger.warn("Some hosts failed: " + loader.getFailedHosts());
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 56bb1d4..4d54cd9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -22,30 +22,29 @@ import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.cassandra.config.Config;
-import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.streaming.*;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.Pair;
/**
* Cassandra SSTable bulk loader.
* Load an externally created sstable into a cluster.
*/
-public class SSTableLoader
+public class SSTableLoader implements StreamEventHandler
{
private final File directory;
private final String keyspace;
private final Client client;
private final OutputHandler outputHandler;
+ private final Set<InetAddress> failedHosts = new HashSet<>();
static
{
@@ -109,110 +108,61 @@ public class SSTableLoader
return sstables;
}
- public LoaderFuture stream()
+ public StreamResultFuture stream()
{
return stream(Collections.<InetAddress>emptySet());
}
- public LoaderFuture stream(Set<InetAddress> toIgnore)
+ public StreamResultFuture stream(Set<InetAddress> toIgnore)
{
client.init(keyspace);
+ StreamPlan plan = new StreamPlan("Bulk Load");
Collection<SSTableReader> sstables = openSSTables();
if (sstables.isEmpty())
{
- outputHandler.output("No sstables to stream");
- return new LoaderFuture(0);
+ // return empty result
+ return plan.execute();
}
-
Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
- outputHandler.output(String.format("Streaming revelant part of %s to %s", names(sstables), endpointToRanges.keySet()));
+ outputHandler.output(String.format("Streaming relevant part of %sto %s", names(sstables), endpointToRanges.keySet()));
- // There will be one streaming session by endpoint
- LoaderFuture future = new LoaderFuture(endpointToRanges.size());
for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : endpointToRanges.entrySet())
{
InetAddress remote = entry.getKey();
if (toIgnore.contains(remote))
- {
- future.latch.countDown();
continue;
- }
Collection<Range<Token>> ranges = entry.getValue();
- StreamOutSession session = StreamOutSession.create(keyspace, remote, new CountDownCallback(future, remote));
// transferSSTables assumes references have been acquired
SSTableReader.acquireReferences(sstables);
- StreamOut.transferSSTables(session, sstables, ranges, OperationType.BULK_LOAD);
- future.setPendings(remote, session.getFiles());
+ plan.transferFiles(remote, ranges, sstables);
}
- return future;
+ StreamResultFuture bulkResult = plan.execute();
+ bulkResult.addEventListener(this);
+ return bulkResult;
}
- public static class LoaderFuture implements Future<Void>
- {
- final CountDownLatch latch;
- final Map<InetAddress, Collection<PendingFile>> pendingFiles;
- private List<InetAddress> failedHosts = new ArrayList<InetAddress>();
-
- private LoaderFuture(int request)
- {
- latch = new CountDownLatch(request);
- pendingFiles = new HashMap<InetAddress, Collection<PendingFile>>();
- }
-
- private void setPendings(InetAddress remote, Collection<PendingFile> files)
- {
- pendingFiles.put(remote, new ArrayList(files));
- }
+ public void onSuccess(StreamState finalState) {}
+ public void onFailure(Throwable t) {}
- private void setFailed(InetAddress addr)
- {
- failedHosts.add(addr);
- }
-
- public List<InetAddress> getFailedHosts()
- {
- return failedHosts;
- }
-
- public boolean cancel(boolean mayInterruptIfRunning)
- {
- throw new UnsupportedOperationException("Cancellation is not yet supported");
- }
-
- public Void get() throws InterruptedException
- {
- latch.await();
- return null;
- }
-
- public Void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
- {
- if (latch.await(timeout, unit))
- return null;
- else
- throw new TimeoutException();
- }
-
- public boolean isCancelled()
- {
- // For now, cancellation is not supported, maybe one day...
- return false;
- }
-
- public boolean isDone()
- {
- return latch.getCount() == 0;
- }
-
- public boolean hadFailures()
+ public void handleStreamEvent(StreamEvent event)
+ {
+ if (event.eventType == StreamEvent.Type.FILE_PROGRESS)
{
- return failedHosts.size() > 0;
+ ProgressInfo progress = ((StreamEvent.ProgressEvent) event).progress;
+ StringBuilder sb = new StringBuilder("\r");
+ sb.append(progress.fileName);
+ sb.append(": ");
+ sb.append(progress.currentBytes).append("/").append(progress.totalBytes);
+ System.out.print(sb.toString());
+ if (progress.currentBytes == progress.totalBytes)
+ System.out.println();
}
-
- public Map<InetAddress, Collection<PendingFile>> getPendingFiles()
+ else if (event.eventType == StreamEvent.Type.STREAM_COMPLETE)
{
- return pendingFiles;
+ StreamEvent.SessionCompleteEvent se = (StreamEvent.SessionCompleteEvent) event;
+ if (!se.success)
+ failedHosts.add(se.peer);
}
}
@@ -224,34 +174,9 @@ public class SSTableLoader
return builder.toString();
}
- private class CountDownCallback implements IStreamCallback
+ public Set<InetAddress> getFailedHosts()
{
- private final InetAddress endpoint;
- private final LoaderFuture future;
-
- CountDownCallback(LoaderFuture future, InetAddress endpoint)
- {
- this.future = future;
- this.endpoint = endpoint;
- }
-
- public void onSuccess()
- {
- future.latch.countDown();
- outputHandler.debug(String.format("Streaming session to %s completed (waiting on %d outstanding sessions)", endpoint, future.latch.getCount()));
-
- // There could be race with stop being called twice but it should be ok
- if (future.latch.getCount() == 0)
- client.stop();
- }
-
- public void onFailure()
- {
- outputHandler.output(String.format("Streaming session to %s failed", endpoint));
- future.setFailed(endpoint);
- future.latch.countDown();
- client.stop();
- }
+ return failedHosts;
}
public static abstract class Client
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
new file mode 100644
index 0000000..dd5b7b4
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.Socket;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamInitMessage;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+
+/**
+ * Thread to consume stream init messages.
+ */
+public class IncomingStreamingConnection extends Thread
+{
+ private static final Logger logger = LoggerFactory.getLogger(IncomingStreamingConnection.class);
+
+ private final int version;
+ private final Socket socket;
+
+ public IncomingStreamingConnection(int version, Socket socket)
+ {
+ super("stream-init " + socket.getRemoteSocketAddress());
+ this.version = version;
+ this.socket = socket;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ // streaming connections are per-session and have a fixed version. we can't do anything with a wrong-version stream connection, so drop it.
+ if (version != StreamMessage.CURRENT_VERSION)
+ throw new IOException(String.format("Received stream using protocol version %d (my version %d). Terminating connection", version, MessagingService.current_version));
+
+ DataInput input = new DataInputStream(socket.getInputStream());
+ StreamInitMessage init = StreamInitMessage.serializer.deserialize(input, version);
+
+ StreamSession.startReceivingStreamAsync(init.planId, init.description, socket, version);
+ }
+ catch (IOException e)
+ {
+ logger.debug("IOException reading from socket; closing", e);
+ try
+ {
+ socket.close();
+ }
+ catch (IOException e2)
+ {
+ logger.debug("error closing socket", e2);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 1977086..4130006 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -24,24 +24,25 @@ import java.net.SocketException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.xerial.snappy.SnappyInputStream;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.streaming.IncomingStreamReader;
-import org.apache.cassandra.streaming.StreamHeader;
-import org.xerial.snappy.SnappyInputStream;
public class IncomingTcpConnection extends Thread
{
private static final Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class);
+ private final int version;
+ private final boolean compressed;
private final Socket socket;
public InetAddress from;
- public IncomingTcpConnection(Socket socket)
+ public IncomingTcpConnection(int version, boolean compressed, Socket socket)
{
assert socket != null;
+ this.version = version;
+ this.compressed = compressed;
this.socket = socket;
if (DatabaseDescriptor.getInternodeRecvBufferSize() != null)
{
@@ -66,20 +67,10 @@ public class IncomingTcpConnection extends Thread
{
try
{
- // determine the connection type to decide whether to buffer
- DataInputStream in = new DataInputStream(socket.getInputStream());
- MessagingService.validateMagic(in.readInt());
- int header = in.readInt();
- boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
- int version = MessagingService.getBits(header, 15, 8);
- logger.debug("Connection version {} from {}", version, socket.getInetAddress());
-
- if (isStream)
- handleStream(in, version);
- else if (version < MessagingService.VERSION_12)
- handleLegacyVersion(version);
+ if (version < MessagingService.VERSION_12)
+ handleLegacyVersion();
else
- handleModernVersion(version, header);
+ handleModernVersion();
}
catch (EOFException e)
{
@@ -96,7 +87,7 @@ public class IncomingTcpConnection extends Thread
}
}
- private void handleModernVersion(int version, int header) throws IOException
+ private void handleModernVersion() throws IOException
{
DataOutputStream out = new DataOutputStream(socket.getOutputStream());
out.writeInt(MessagingService.current_version);
@@ -105,7 +96,6 @@ public class IncomingTcpConnection extends Thread
DataInputStream in = new DataInputStream(socket.getInputStream());
int maxVersion = in.readInt();
from = CompactEndpointSerializationHelper.deserialize(in);
- boolean compressed = MessagingService.getBits(header, 2, 1) == 1;
if (compressed)
{
@@ -136,28 +126,11 @@ public class IncomingTcpConnection extends Thread
}
}
- private void handleLegacyVersion(int version)
+ private void handleLegacyVersion()
{
throw new UnsupportedOperationException("Unable to read obsolete message version " + version + "; the earliest version supported is 1.2.0");
}
- private void handleStream(DataInputStream input, int version) throws IOException
- {
- if (version == MessagingService.current_version)
- {
- int size = input.readInt();
- byte[] headerBytes = new byte[size];
- input.readFully(headerBytes);
- stream(StreamHeader.serializer.deserialize(new DataInputStream(new FastByteArrayInputStream(headerBytes)), version), input);
- }
- else
- {
- // streaming connections are per-session and have a fixed version. we can't do anything with a wrong-version stream connection, so drop it.
- logger.error("Received stream using protocol version {} (my version {}). Terminating connection",
- version, MessagingService.current_version);
- }
- }
-
private InetAddress receiveMessage(DataInputStream input, int version) throws IOException
{
int id;
@@ -204,9 +177,4 @@ public class IncomingTcpConnection extends Thread
logger.debug("error closing socket", e);
}
}
-
- private void stream(StreamHeader streamHeader, DataInputStream input) throws IOException
- {
- new IncomingStreamReader(streamHeader, socket).read();
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 1569faf..8e94f8e 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -17,13 +17,9 @@
*/
package org.apache.cassandra.net;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOError;
-import java.io.IOException;
+import java.io.*;
import java.lang.management.ManagementFactory;
import java.net.*;
-import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
@@ -53,7 +49,6 @@ import org.apache.cassandra.gms.GossipDigestAck;
import org.apache.cassandra.gms.GossipDigestAck2;
import org.apache.cassandra.gms.GossipDigestSyn;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.ILatencySubscriber;
import org.apache.cassandra.metrics.ConnectionMetrics;
import org.apache.cassandra.metrics.DroppedMessageMetrics;
@@ -63,7 +58,6 @@ import org.apache.cassandra.service.*;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.service.paxos.PrepareResponse;
import org.apache.cassandra.streaming.*;
-import org.apache.cassandra.streaming.compress.CompressedFileStreamTask;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -92,8 +86,8 @@ public final class MessagingService implements MessagingServiceMBean
REQUEST_RESPONSE, // client-initiated reads and writes
@Deprecated STREAM_INITIATE,
@Deprecated STREAM_INITIATE_DONE,
- STREAM_REPLY,
- STREAM_REQUEST,
+ @Deprecated STREAM_REPLY,
+ @Deprecated STREAM_REQUEST,
RANGE_SLICE,
@Deprecated BOOTSTRAP_TOKEN,
TREE_REQUEST,
@@ -192,8 +186,6 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.MUTATION, RowMutation.serializer);
put(Verb.READ_REPAIR, RowMutation.serializer);
put(Verb.READ, ReadCommand.serializer);
- put(Verb.STREAM_REPLY, StreamReply.serializer);
- put(Verb.STREAM_REQUEST, StreamRequest.serializer);
put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
put(Verb.TREE_REQUEST, ActiveRepairService.TreeRequest.serializer);
@@ -637,34 +629,6 @@ public final class MessagingService implements MessagingServiceMBean
return iar;
}
- /**
- * Stream a file from source to destination. This is highly optimized
- * to not hold any of the contents of the file in memory.
- *
- * @param header Header contains file to stream and other metadata.
- * @param to endpoint to which we need to stream the file.
- */
-
- public void stream(StreamHeader header, InetAddress to)
- {
- DebuggableThreadPoolExecutor executor = streamExecutors.get(to);
- if (executor == null)
- {
- // Using a core pool size of 0 is important. See documentation of streamExecutors.
- executor = DebuggableThreadPoolExecutor.createWithMaximumPoolSize("Streaming to " + to, 1, 1, TimeUnit.SECONDS);
- DebuggableThreadPoolExecutor old = streamExecutors.putIfAbsent(to, executor);
- if (old != null)
- {
- executor.shutdown();
- executor = old;
- }
- }
-
- executor.execute(header.file == null || header.file.compressionInfo == null
- ? new FileStreamTask(header, to)
- : new CompressedFileStreamTask(header, to));
- }
-
public void register(ILatencySubscriber subcriber)
{
subscribers.add(subcriber);
@@ -763,44 +727,6 @@ public final class MessagingService implements MessagingServiceMBean
return packed >>> (start + 1) - count & ~(-1 << count);
}
- public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress, int version)
- {
- int header = 0;
- // set compression bit.
- if (compress)
- header |= 4;
- // set streaming bit
- header |= 8;
- // Setting up the version bit
- header |= (version << 8);
-
- /* Adding the StreamHeader which contains the session Id along
- * with the pendingfile info for the stream.
- * | Session Id | Pending File Size | Pending File | Bool more files |
- * | No. of Pending files | Pending Files ... |
- */
- byte[] bytes;
- try
- {
- DataOutputBuffer buffer = new DataOutputBuffer();
- StreamHeader.serializer.serialize(streamHeader, buffer, version);
- bytes = buffer.getData();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- assert bytes.length > 0;
-
- ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + 4 + bytes.length);
- buffer.putInt(PROTOCOL_MAGIC);
- buffer.putInt(header);
- buffer.putInt(bytes.length);
- buffer.put(bytes);
- buffer.flip();
- return buffer;
- }
-
/**
* @return the last version associated with address, or @param version if this is the first such version
*/
@@ -885,9 +811,29 @@ public final class MessagingService implements MessagingServiceMBean
{
Socket socket = server.accept();
if (authenticate(socket))
- new IncomingTcpConnection(socket).start();
+ {
+ // determine the connection type to decide whether to buffer
+ DataInputStream in = new DataInputStream(socket.getInputStream());
+ MessagingService.validateMagic(in.readInt());
+ int header = in.readInt();
+ boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
+ int version = MessagingService.getBits(header, 15, 8);
+ logger.debug("Connection version {} from {}", version, socket.getInetAddress());
+
+ if (isStream)
+ {
+ new IncomingStreamingConnection(version, socket).start();
+ }
+ else
+ {
+ boolean compressed = MessagingService.getBits(header, 2, 1) == 1;
+ new IncomingTcpConnection(version, compressed, socket).start();
+ }
+ }
else
+ {
socket.close();
+ }
}
catch (AsynchronousCloseException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index f6becd0..5e30207 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -37,6 +37,8 @@ import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.*;
import com.google.common.util.concurrent.AtomicDouble;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.log4j.Level;
@@ -214,6 +216,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
jmxObjectName = new ObjectName("org.apache.cassandra.db:type=StorageService");
mbs.registerMBean(this, jmxObjectName);
+ mbs.registerMBean(StreamManager.instance, new ObjectName(StreamManager.OBJECT_NAME));
}
catch (Exception e)
{
@@ -232,8 +235,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_COMMIT, new CommitVerbHandler());
// see BootStrapper for a summary of how the bootstrap verbs interact
- MessagingService.instance().registerVerbHandlers(MessagingService.Verb.STREAM_REQUEST, new StreamRequestVerbHandler());
- MessagingService.instance().registerVerbHandlers(MessagingService.Verb.STREAM_REPLY, new StreamReplyVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REPLICATION_FINISHED, new ReplicationFinishedVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.REQUEST_RESPONSE, new ResponseVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.INTERNAL_RESPONSE, new ResponseVerbHandler());
@@ -253,10 +254,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT, new SnapshotVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.ECHO, new EchoVerbHandler());
-
- // spin up the streaming service so it is available for jmx tools.
- if (StreamingService.instance == null)
- throw new RuntimeException("Streaming service is unavailable.");
}
public void registerDaemon(CassandraDaemon daemon)
@@ -792,7 +789,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc);
- RangeStreamer streamer = new RangeStreamer(tokenMetadata, FBUtilities.getBroadcastAddress(), OperationType.REBUILD);
+ RangeStreamer streamer = new RangeStreamer(tokenMetadata, FBUtilities.getBroadcastAddress(), "Rebuild");
streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
if (sourceDc != null)
streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));
@@ -800,7 +797,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
for (String table : Schema.instance.getNonSystemTables())
streamer.addRanges(table, getLocalRanges(table));
- streamer.fetch();
+ try
+ {
+ streamer.fetchAsync().get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException("Interrupted while waiting on rebuild streaming");
+ }
+ catch (ExecutionException e)
+ {
+ // This is used exclusively through JMX, so log the full trace but only throw a simple RTE
+ logger.error("Error while rebuilding node", e.getCause());
+ throw new RuntimeException("Error while rebuilding node: " + e.getCause().getMessage());
+ }
}
public void setStreamThroughputMbPerSec(int value)
@@ -1775,7 +1785,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
private void restoreReplicaCount(InetAddress endpoint, final InetAddress notifyEndpoint)
{
- final Multimap<InetAddress, String> fetchSources = HashMultimap.create();
Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> rangesToFetch = HashMultimap.create();
final InetAddress myAddress = FBUtilities.getBroadcastAddress();
@@ -1792,40 +1801,37 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
Multimap<InetAddress, Range<Token>> sourceRanges = getNewSourceRanges(table, myNewRanges);
for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : sourceRanges.asMap().entrySet())
{
- fetchSources.put(entry.getKey(), table);
rangesToFetch.put(table, entry);
}
}
+ StreamPlan stream = new StreamPlan("Restore replica count");
for (final String table : rangesToFetch.keySet())
{
for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : rangesToFetch.get(table))
{
final InetAddress source = entry.getKey();
Collection<Range<Token>> ranges = entry.getValue();
- final IStreamCallback callback = new IStreamCallback()
- {
- public void onSuccess()
- {
- synchronized (fetchSources)
- {
- fetchSources.remove(source, table);
- if (fetchSources.isEmpty())
- sendReplicationNotification(notifyEndpoint);
- }
- }
-
- public void onFailure()
- {
- logger.warn("Streaming from " + source + " failed");
- onSuccess(); // calling onSuccess to send notification
- }
- };
if (logger.isDebugEnabled())
logger.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", "));
- StreamIn.requestRanges(source, table, ranges, callback, OperationType.RESTORE_REPLICA_COUNT);
+ stream.requestRanges(source, table, ranges);
}
}
+ StreamResultFuture future = stream.execute();
+ Futures.addCallback(future, new FutureCallback<StreamState>()
+ {
+ public void onSuccess(StreamState finalState)
+ {
+ sendReplicationNotification(notifyEndpoint);
+ }
+
+ public void onFailure(Throwable t)
+ {
+ logger.warn("Streaming to restore replica count failed", t);
+ // We still want to send the notification
+ sendReplicationNotification(notifyEndpoint);
+ }
+ });
}
// needs to be modified to accept either a table or ARS.
@@ -2694,17 +2700,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
setMode(Mode.LEAVING, "streaming data to other nodes", true);
- CountDownLatch latch = streamRanges(rangesToStream);
- CountDownLatch hintsLatch = streamHints();
+ Future<StreamState> streamSuccess = streamRanges(rangesToStream);
+ Future<StreamState> hintsSuccess = streamHints();
// wait for the transfer runnables to signal the latch.
logger.debug("waiting for stream aks.");
try
{
- latch.await();
- hintsLatch.await();
+ streamSuccess.get();
+ hintsSuccess.get();
}
- catch (InterruptedException e)
+ catch (ExecutionException | InterruptedException e)
{
throw new RuntimeException(e);
}
@@ -2713,10 +2719,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
onFinish.run();
}
- private CountDownLatch streamHints()
+ private Future<StreamState> streamHints()
{
if (HintedHandOffManager.instance.listEndpointsPendingHints().size() == 0)
- return new CountDownLatch(0);
+ return Futures.immediateFuture(null);
// gather all live nodes in the cluster that aren't also leaving
List<InetAddress> candidates = new ArrayList<InetAddress>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
@@ -2731,7 +2737,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (candidates.isEmpty())
{
logger.warn("Unable to stream hints since no live endpoints seen");
- return new CountDownLatch(0);
+ return Futures.immediateFuture(null);
}
else
{
@@ -2743,14 +2749,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
Token token = StorageService.getPartitioner().getMinimumToken();
List<Range<Token>> ranges = Collections.singletonList(new Range<Token>(token, token));
- CountDownLatch latch = new CountDownLatch(1);
- StreamOut.transferRanges(hintsDestinationHost,
- Table.open(Table.SYSTEM_KS),
- Collections.singletonList(Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF)),
- ranges,
- new CountingDownStreamCallback(latch, hintsDestinationHost),
- OperationType.UNBOOTSTRAP);
- return latch;
+ return new StreamPlan("Hints").transferRanges(hintsDestinationHost,
+ Table.SYSTEM_KS,
+ ranges,
+ SystemTable.HINTS_CF)
+ .execute();
}
}
@@ -2812,25 +2815,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (relocator.streamsNeeded())
{
setMode(Mode.MOVING, "fetching new ranges and streaming old ranges", true);
-
- relocator.logStreamsMap("[Move->STREAMING]");
- CountDownLatch streamLatch = relocator.streams();
-
- relocator.logRequestsMap("[Move->FETCHING]");
- CountDownLatch fetchLatch = relocator.requests();
-
try
{
- streamLatch.await();
- fetchLatch.await();
+ relocator.stream().get();
}
- catch (InterruptedException e)
+ catch (ExecutionException | InterruptedException e)
{
- throw new RuntimeException("Interrupted latch while waiting for stream/fetch ranges to finish: " + e.getMessage());
+ throw new RuntimeException("Interrupted while waiting for stream/fetch ranges to finish: " + e.getMessage());
}
}
else
+ {
setMode(Mode.MOVING, "No ranges to fetch/stream", true);
+ }
setTokens(Collections.singleton(newToken)); // setting new token as we have everything settled
@@ -2840,8 +2837,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private class RangeRelocator
{
- private Map<String, Multimap<InetAddress, Range<Token>>> rangesToFetch = new HashMap<String, Multimap<InetAddress, Range<Token>>>();
- private Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByTable = new HashMap<String, Multimap<Range<Token>, InetAddress>>();
+ private StreamPlan streamPlan = new StreamPlan("Bootstrap");
private RangeRelocator(Collection<Token> tokens, List<String> tables)
{
@@ -2856,15 +2852,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// clone to avoid concurrent modification in calculateNaturalEndpoints
TokenMetadata tokenMetaClone = tokenMetadata.cloneOnlyTokenMap();
- for (String table : tables)
+ for (String keyspace : tables)
{
for (Token newToken : newTokens)
{
// replication strategy of the current keyspace (aka table)
- AbstractReplicationStrategy strategy = Table.open(table).getReplicationStrategy();
+ AbstractReplicationStrategy strategy = Table.open(keyspace).getReplicationStrategy();
// getting collection of the currently used ranges by this keyspace
- Collection<Range<Token>> currentRanges = getRangesForEndpoint(table, localAddress);
+ Collection<Range<Token>> currentRanges = getRangesForEndpoint(keyspace, localAddress);
// collection of ranges which this node will serve after move to the new token
Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetadata, newToken, localAddress);
@@ -2895,51 +2891,39 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// calculating endpoints to stream current ranges to if needed
// in some situations node will handle current ranges as part of the new ranges
- Multimap<Range<Token>, InetAddress> rangeWithEndpoints = HashMultimap.create();
-
+ Multimap<InetAddress, Range<Token>> endpointRanges = HashMultimap.create();
for (Range<Token> toStream : rangesPerTable.left)
{
Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaClone));
Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(toStream.right, tokenMetaCloneAllSettled));
logger.debug("Range:" + toStream + "Current endpoints: " + currentEndpoints + " New endpoints: " + newEndpoints);
- rangeWithEndpoints.putAll(toStream, Sets.difference(newEndpoints, currentEndpoints));
+ for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
+ endpointRanges.put(address, toStream);
}
- // associating table with range-to-endpoints map
- rangesToStreamByTable.put(table, rangeWithEndpoints);
+ // stream ranges
+ for (InetAddress address : endpointRanges.keySet())
+ streamPlan.transferRanges(address, keyspace, endpointRanges.get(address));
+ // stream requests
Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints);
- rangesToFetch.put(table, workMap);
+ for (InetAddress address : workMap.keySet())
+ streamPlan.requestRanges(address, keyspace, workMap.get(address));
if (logger.isDebugEnabled())
- logger.debug("Table {}: work map {}.", table, workMap);
+ logger.debug("Table {}: work map {}.", keyspace, workMap);
}
}
}
- private void logStreamsMap(String prefix)
- {
- logger.debug("{} Work map: {}", prefix, rangesToStreamByTable);
- }
-
- private void logRequestsMap(String prefix)
+ public Future<StreamState> stream()
{
- logger.debug("{} Work map: {}", prefix, rangesToFetch);
+ return streamPlan.execute();
}
- private boolean streamsNeeded()
+ public boolean streamsNeeded()
{
- return !rangesToStreamByTable.isEmpty() || !rangesToFetch.isEmpty();
- }
-
- private CountDownLatch streams()
- {
- return streamRanges(rangesToStreamByTable);
- }
-
- private CountDownLatch requests()
- {
- return requestRanges(rangesToFetch);
+ return !streamPlan.isEmpty();
}
}
@@ -2995,25 +2979,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (relocator.streamsNeeded())
{
setMode(Mode.RELOCATING, "fetching new ranges and streaming old ranges", true);
-
- relocator.logStreamsMap("[Relocate->STREAMING]");
- CountDownLatch streamLatch = relocator.streams();
-
- relocator.logRequestsMap("[Relocate->FETCHING]");
- CountDownLatch fetchLatch = relocator.requests();
-
try
{
- streamLatch.await();
- fetchLatch.await();
+ relocator.stream().get();
}
- catch (InterruptedException e)
+ catch (ExecutionException | InterruptedException e)
{
throw new RuntimeException("Interrupted latch while waiting for stream/fetch ranges to finish: " + e.getMessage());
}
}
else
+ {
setMode(Mode.RELOCATING, "no new ranges to stream/fetch", true);
+ }
Collection<Token> currentTokens = SystemTable.updateLocalTokens(tokens, Collections.<Token>emptyList());
tokenMetadata.updateNormalTokens(currentTokens, FBUtilities.getBroadcastAddress());
@@ -3420,26 +3398,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
* Seed data to the endpoints that will be responsible for it at the future
*
* @param rangesToStreamByTable tables and data ranges with endpoints included for each
- * @return latch to count down
+ * @return async Future for whether stream was success
*/
- private CountDownLatch streamRanges(final Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByTable)
+ private Future<StreamState> streamRanges(final Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByTable)
{
// First, we build a list of ranges to stream to each host, per table
final Map<String, Map<InetAddress, List<Range<Token>>>> sessionsToStreamByTable = new HashMap<String, Map<InetAddress, List<Range<Token>>>>();
- // The number of stream out sessions we need to start, to be built up as we build sessionsToStreamByTable
- int sessionCount = 0;
-
for (Map.Entry<String, Multimap<Range<Token>, InetAddress>> entry : rangesToStreamByTable.entrySet())
{
+ String keyspace = entry.getKey();
Multimap<Range<Token>, InetAddress> rangesWithEndpoints = entry.getValue();
if (rangesWithEndpoints.isEmpty())
continue;
- final String table = entry.getKey();
-
Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = new HashMap<InetAddress, List<Range<Token>>>();
-
for (final Map.Entry<Range<Token>, InetAddress> endPointEntry : rangesWithEndpoints.entries())
{
final Range<Token> range = endPointEntry.getKey();
@@ -3454,12 +3427,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
curRanges.add(range);
}
- sessionCount += rangesPerEndpoint.size();
- sessionsToStreamByTable.put(table, rangesPerEndpoint);
+ sessionsToStreamByTable.put(keyspace, rangesPerEndpoint);
}
- final CountDownLatch latch = new CountDownLatch(sessionCount);
-
+ StreamPlan streamPlan = new StreamPlan("Unbootstrap");
for (Map.Entry<String, Map<InetAddress, List<Range<Token>>>> entry : sessionsToStreamByTable.entrySet())
{
final String table = entry.getKey();
@@ -3471,90 +3442,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
final InetAddress newEndpoint = rangesEntry.getKey();
// TODO each call to transferRanges re-flushes, this is potentially a lot of waste
- StreamOut.transferRanges(newEndpoint,
- Table.open(table),
- ranges,
- new CountingDownStreamCallback(latch, newEndpoint),
- OperationType.UNBOOTSTRAP);
- }
- }
- return latch;
- }
-
- static class CountingDownStreamCallback implements IStreamCallback
- {
- private final CountDownLatch latch;
- private final InetAddress targetAddr;
-
- CountingDownStreamCallback(CountDownLatch latch, InetAddress targetAddr)
- {
- this.latch = latch;
- this.targetAddr = targetAddr;
- }
-
- public void onSuccess()
- {
- latch.countDown();
- }
-
- public void onFailure()
- {
- logger.warn("Streaming to " + targetAddr + " failed");
- onSuccess(); // calling onSuccess for latch countdown
- }
- }
-
- /**
- * Used to request ranges from endpoints in the ring (will block until all data is fetched and ready)
- * @param ranges ranges to fetch as map of the preferred address and range collection
- * @return latch to count down
- */
- private CountDownLatch requestRanges(final Map<String, Multimap<InetAddress, Range<Token>>> ranges)
- {
- final CountDownLatch latch = new CountDownLatch(ranges.keySet().size());
- for (Map.Entry<String, Multimap<InetAddress, Range<Token>>> entry : ranges.entrySet())
- {
- Multimap<InetAddress, Range<Token>> endpointWithRanges = entry.getValue();
-
- if (endpointWithRanges.isEmpty())
- {
- latch.countDown();
- continue;
- }
-
- final String table = entry.getKey();
- final Set<InetAddress> pending = new HashSet<InetAddress>(endpointWithRanges.keySet());
-
- // Send messages to respective folks to stream data over to me
- for (final InetAddress source: endpointWithRanges.keySet())
- {
- Collection<Range<Token>> toFetch = endpointWithRanges.get(source);
-
- final IStreamCallback callback = new IStreamCallback()
- {
- public void onSuccess()
- {
- pending.remove(source);
-
- if (pending.isEmpty())
- latch.countDown();
- }
-
- public void onFailure()
- {
- logger.warn("Streaming from " + source + " failed");
- onSuccess(); // calling onSuccess for latch countdown
- }
- };
-
- if (logger.isDebugEnabled())
- logger.debug("Requesting from " + source + " ranges " + StringUtils.join(toFetch, ", "));
-
- // sending actual request
- StreamIn.requestRanges(source, table, toFetch, callback, OperationType.BOOTSTRAP);
+ streamPlan.transferRanges(newEndpoint, table, ranges);
}
}
- return latch;
+ return streamPlan.execute();
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
deleted file mode 100644
index 89fbf5f..0000000
--- a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.streaming;
-
-import java.net.InetAddress;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.*;
-
-public abstract class AbstractStreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
-{
- private static final Logger logger = LoggerFactory.getLogger(AbstractStreamSession.class);
-
- protected final InetAddress host;
- protected final UUID sessionId;
- protected String table;
- protected final IStreamCallback callback;
- private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
- protected AbstractStreamSession(String table, InetAddress host, UUID sessionId, IStreamCallback callback)
- {
- this.host = host;
- this.sessionId = sessionId;
- this.table = table;
- this.callback = callback;
- Gossiper.instance.register(this);
- FailureDetector.instance.registerFailureDetectionEventListener(this);
- }
-
- public UUID getSessionId()
- {
- return sessionId;
- }
-
- public InetAddress getHost()
- {
- return host;
- }
-
- public void close(boolean success)
- {
- if (!isClosed.compareAndSet(false, true))
- {
- logger.debug("Stream session {} already closed", getSessionId());
- return;
- }
-
- closeInternal(success);
-
- Gossiper.instance.unregister(this);
- FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-
- logger.debug("closing with status " + success);
- if (callback != null)
- {
- if (success)
- callback.onSuccess();
- else
- callback.onFailure();
- }
- }
-
- protected abstract void closeInternal(boolean success);
-
- public void onJoin(InetAddress endpoint, EndpointState epState) {}
- public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
- public void onAlive(InetAddress endpoint, EndpointState state) {}
- public void onDead(InetAddress endpoint, EndpointState state) {}
-
- public void onRemove(InetAddress endpoint)
- {
- convict(endpoint, Double.MAX_VALUE);
- }
-
- public void onRestart(InetAddress endpoint, EndpointState epState)
- {
- convict(endpoint, Double.MAX_VALUE);
- }
-
- public void convict(InetAddress endpoint, double phi)
- {
- if (!endpoint.equals(getHost()))
- return;
-
- // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost.
- if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
- return;
-
- logger.error("Stream failed because {} died or was restarted/removed (streams may still be active "
- + "in background, but further streams won't be started)", endpoint);
- close(false);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
new file mode 100644
index 0000000..27ea5af
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.messages.StreamInitMessage;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+
+/**
+ * ConnectionHandler manages incoming/outgoing message exchange for the {@link StreamSession}.
+ *
+ * <p>
+ * Internally, ConnectionHandler manages thread to receive incoming {@link StreamMessage} and thread to
+ * send outgoing message. Messages are encoded/decoded on those thread and handed to
+ * {@link StreamSession#messageReceived(org.apache.cassandra.streaming.messages.StreamMessage)}.
+ */
+public class ConnectionHandler
+{
+ private static final Logger logger = LoggerFactory.getLogger(ConnectionHandler.class);
+
+ private static final int MAX_CONNECT_ATTEMPTS = 3;
+
+ private final StreamSession session;
+ private final int protocolVersion;
+
+ private IncomingMessageHandler incoming;
+ private OutgoingMessageHandler outgoing;
+
+ private boolean connected = false;
+ private Socket socket;
+
+ ConnectionHandler(StreamSession session)
+ {
+ this.session = session;
+ this.protocolVersion = StreamMessage.CURRENT_VERSION;
+ }
+
+ ConnectionHandler(StreamSession session, Socket socket, int protocolVersion)
+ {
+ this.session = session;
+ this.socket = Preconditions.checkNotNull(socket);
+ this.connected = socket.isConnected();
+ this.protocolVersion = protocolVersion;
+ }
+
+ /**
+ * Connect to peer and start exchanging message.
+ * When connect attempt fails, this retries for maximum of MAX_CONNECT_ATTEMPTS times.
+ *
+ * @throws IOException when connection failed.
+ */
+ public void connect() throws IOException
+ {
+ int attempts = 0;
+ while (true)
+ {
+ try
+ {
+ socket = MessagingService.instance().getConnectionPool(session.peer).newSocket();
+ socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
+ break;
+ }
+ catch (IOException e)
+ {
+ if (++attempts >= MAX_CONNECT_ATTEMPTS)
+ throw e;
+
+ long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts);
+ logger.warn("Failed attempt " + attempts + " to connect to " + session.peer + ". Retrying in " + waitms + " ms. (" + e + ")");
+ try
+ {
+ Thread.sleep(waitms);
+ }
+ catch (InterruptedException wtf)
+ {
+ throw new IOException("interrupted", wtf);
+ }
+ }
+ }
+ // send stream init message
+ SocketChannel channel = socket.getChannel();
+ WritableByteChannel out = channel;
+ // socket channel is null when encrypted(SSL)
+ if (channel == null)
+ {
+ out = Channels.newChannel(socket.getOutputStream());
+ }
+ logger.debug("Sending stream init...");
+ StreamInitMessage message = new StreamInitMessage(session.planId(), session.description());
+ out.write(message.createMessage(false, protocolVersion));
+
+ connected = true;
+
+ start();
+ session.onConnect();
+ }
+
+ public void close()
+ {
+ incoming.terminate();
+ outgoing.terminate();
+ if (socket != null && !isConnected())
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (IOException ignore) {}
+ }
+ }
+
+ /**
+ * Start incoming/outgoing messaging threads.
+ */
+ public void start() throws IOException
+ {
+ SocketChannel channel = socket.getChannel();
+ ReadableByteChannel in = channel;
+ WritableByteChannel out = channel;
+ // socket channel is null when encrypted(SSL)
+ if (channel == null)
+ {
+ in = Channels.newChannel(socket.getInputStream());
+ out = Channels.newChannel(socket.getOutputStream());
+ }
+
+ incoming = new IncomingMessageHandler(session, protocolVersion, in);
+ outgoing = new OutgoingMessageHandler(session, protocolVersion, out);
+
+ // ready to send/receive files
+ new Thread(incoming, "STREAM-IN-" + session.peer).start();
+ new Thread(outgoing, "STREAM-OUT-" + session.peer).start();
+ }
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ /**
+ * Enqueue messages to be sent.
+ *
+ * @param messages messages to send
+ */
+ public void sendMessages(Collection<? extends StreamMessage> messages)
+ {
+ for (StreamMessage message : messages)
+ sendMessage(message);
+ }
+
+ public void sendMessage(StreamMessage message)
+ {
+ assert isConnected();
+ outgoing.enqueue(message);
+ }
+
+ abstract static class MessageHandler implements Runnable
+ {
+ protected final StreamSession session;
+ protected final int protocolVersion;
+ private volatile boolean terminated;
+
+ protected MessageHandler(StreamSession session, int protocolVersion)
+ {
+ this.session = session;
+ this.protocolVersion = protocolVersion;
+ }
+
+ public void terminate()
+ {
+ terminated = true;
+ }
+
+ public boolean terminated()
+ {
+ return terminated;
+ }
+ }
+
+ /**
+ * Incoming streaming message handler
+ */
+ static class IncomingMessageHandler extends MessageHandler
+ {
+ private final ReadableByteChannel in;
+
+ IncomingMessageHandler(StreamSession session, int protocolVersion, ReadableByteChannel in)
+ {
+ super(session, protocolVersion);
+ this.in = in;
+ }
+
+ public void run()
+ {
+ while (!terminated())
+ {
+ try
+ {
+ // receive message
+ StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session);
+ assert message != null;
+ session.messageReceived(message);
+ }
+ catch (SocketException e)
+ {
+ // socket is closed
+ terminate();
+ }
+ catch (Throwable e)
+ {
+ session.onError(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Outgoing file transfer thread
+ */
+ static class OutgoingMessageHandler extends MessageHandler
+ {
+ /*
+ * All out going messages are queued up into messageQueue.
+ * The size will grow when received streaming request.
+ *
+ * Queue is also PriorityQueue so that prior messages can go out fast.
+ */
+ private final PriorityBlockingQueue<StreamMessage> messageQueue = new PriorityBlockingQueue<>(64, new Comparator<StreamMessage>()
+ {
+ public int compare(StreamMessage o1, StreamMessage o2)
+ {
+ return o2.getPriority() - o1.getPriority();
+ }
+ });
+
+ private final WritableByteChannel out;
+
+ OutgoingMessageHandler(StreamSession session, int protocolVersion, WritableByteChannel out)
+ {
+ super(session, protocolVersion);
+ this.out = out;
+ }
+
+ public void enqueue(StreamMessage message)
+ {
+ messageQueue.put(message);
+ }
+
+ public void run()
+ {
+ while (!terminated())
+ {
+ try
+ {
+ StreamMessage next = messageQueue.poll(1, TimeUnit.SECONDS);
+ if (next != null)
+ {
+ logger.debug("Sending " + next);
+ StreamMessage.serialize(next, out, protocolVersion, session);
+ if (next.type == StreamMessage.Type.SESSION_FAILED)
+ terminate();
+ }
+ }
+ catch (SocketException e)
+ {
+ session.onError(e);
+ terminate();
+ }
+ catch (InterruptedException | IOException e)
+ {
+ session.onError(e);
+ }
+ }
+ }
+ }
+}