You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2012/03/27 22:06:40 UTC
git commit: Add max_streaming_retries instead of relying solely on
the FD. Patch by brandonwilliams and yukim,
reviewed by brandonwilliams for CASSANDRA-4051
Updated Branches:
refs/heads/cassandra-1.1.0 25828cace -> 5bd78edcc
Add max_streaming_retries instead of relying solely on the FD.
Patch by brandonwilliams and yukim, reviewed by brandonwilliams for
CASSANDRA-4051
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5bd78edc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5bd78edc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5bd78edc
Branch: refs/heads/cassandra-1.1.0
Commit: 5bd78edcce19720d2664780323b35898abeacc1b
Parents: 25828ca
Author: Brandon Williams <br...@apache.org>
Authored: Tue Mar 27 15:00:41 2012 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Mar 27 15:06:14 2012 -0500
----------------------------------------------------------------------
src/java/org/apache/cassandra/config/Config.java | 2 +
.../cassandra/config/DatabaseDescriptor.java | 7 +-
.../org/apache/cassandra/dht/RangeStreamer.java | 7 +-
.../apache/cassandra/io/sstable/SSTableLoader.java | 6 +-
.../apache/cassandra/service/StorageService.java | 17 ++-
.../cassandra/streaming/AbstractStreamSession.java | 112 +++++++++++++++
.../cassandra/streaming/IStreamCallback.java | 36 +++++
.../org/apache/cassandra/streaming/StreamIn.java | 4 +-
.../cassandra/streaming/StreamInSession.java | 68 +++++----
.../org/apache/cassandra/streaming/StreamOut.java | 2 +-
.../cassandra/streaming/StreamOutSession.java | 88 ++----------
.../apache/cassandra/streaming/StreamReply.java | 1 +
.../streaming/StreamReplyVerbHandler.java | 5 +-
.../cassandra/streaming/StreamingRepairTask.java | 22 ++--
14 files changed, 244 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd78edc/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index b17efac..2bc34dc 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -88,6 +88,8 @@ public class Config
public Integer compaction_throughput_mb_per_sec = 16;
public Boolean multithreaded_compaction = false;
+ public Integer max_streaming_retries = 3;
+
public Integer stream_throughput_outbound_megabits_per_sec;
public String[] data_file_directories;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd78edc/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 1648d57..8c5997f 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -665,11 +665,16 @@ public class DatabaseDescriptor
return System.getProperty("cassandra.replace_token", null);
}
- public static String getClusterName()
+ public static String getClusterName()
{
return conf.cluster_name;
}
+ public static int getMaxStreamingRetries()
+ {
+ return conf.max_streaming_retries;
+ }
+
public static String getJobJarLocation()
{
return conf.job_jar_file_location;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd78edc/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 47931f8..dac05cf 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import org.apache.cassandra.streaming.IStreamCallback;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -219,15 +220,17 @@ public class RangeStreamer
final InetAddress source = entry.getValue().getKey();
Collection<Range<Token>> ranges = entry.getValue().getValue();
/* Send messages to respective folks to stream data over to me */
- Runnable callback = new Runnable()
+ IStreamCallback callback = new IStreamCallback()
{
- public void run()
+ public void onSuccess()
{
latch.countDown();
if (logger.isDebugEnabled())
logger.debug(String.format("Removed %s/%s as a %s source; remaining is %s",
source, table, opType, latch.getCount()));
}
+
+ public void onFailure() {}
};
if (logger.isDebugEnabled())
logger.debug("" + opType + "ing from " + source + " ranges " + StringUtils.join(ranges, ", "));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd78edc/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 1ee7a2f..85b5146 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -206,7 +206,7 @@ public class SSTableLoader
return builder.toString();
}
- private class CountDownCallback implements Runnable
+ private class CountDownCallback implements IStreamCallback
{
private final InetAddress endpoint;
private final CountDownLatch latch;
@@ -217,7 +217,7 @@ public class SSTableLoader
this.endpoint = endpoint;
}
- public void run()
+ public void onSuccess()
{
latch.countDown();
outputHandler.debug(String.format("Streaming session to %s completed (waiting on %d outstanding sessions)", endpoint, latch.getCount()));
@@ -226,6 +226,8 @@ public class SSTableLoader
if (latch.getCount() == 0)
client.stop();
}
+
+ public void onFailure() {}
}
public interface OutputHandler
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd78edc/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 986fd81..84c0096 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1490,9 +1490,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
{
final InetAddress source = entry.getKey();
Collection<Range<Token>> ranges = entry.getValue();
- final Runnable callback = new Runnable()
+ final IStreamCallback callback = new IStreamCallback()
{
- public void run()
+ public void onSuccess()
{
synchronized (fetchSources)
{
@@ -1501,6 +1501,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
sendReplicationNotification(myAddress, notifyEndpoint);
}
}
+
+ public void onFailure() {}
};
if (logger_.isDebugEnabled())
logger_.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", "));
@@ -2799,9 +2801,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
final Range<Token> range = endPointEntry.getKey();
final InetAddress newEndpoint = endPointEntry.getValue();
- final Runnable callback = new Runnable()
+ final IStreamCallback callback = new IStreamCallback()
{
- public void run()
+ public void onSuccess()
{
synchronized (pending)
{
@@ -2811,6 +2813,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
latch.countDown();
}
}
+ public void onFailure() {}
};
StageManager.getStage(Stage.STREAM).execute(new Runnable()
@@ -2852,15 +2855,17 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
{
Collection<Range<Token>> toFetch = endpointWithRanges.get(source);
- final Runnable callback = new Runnable()
+ final IStreamCallback callback = new IStreamCallback()
{
- public void run()
+ public void onSuccess()
{
pending.remove(source);
if (pending.isEmpty())
latch.countDown();
}
+
+ public void onFailure() {}
};
if (logger_.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd78edc/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
new file mode 100644
index 0000000..1938e3d
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.*;
+import org.apache.cassandra.utils.Pair;
+
+public abstract class AbstractStreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+{
+ private static final Logger logger = LoggerFactory.getLogger(AbstractStreamSession.class);
+
+ protected String table;
+ protected Pair<InetAddress, Long> context;
+ protected final IStreamCallback callback;
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+ protected AbstractStreamSession(String table, Pair<InetAddress, Long> context, IStreamCallback callback)
+ {
+ this.table = table;
+ this.context = context;
+ this.callback = callback;
+ Gossiper.instance.register(this);
+ FailureDetector.instance.registerFailureDetectionEventListener(this);
+ }
+
+ public long getSessionId()
+ {
+ return context.right;
+ }
+
+ public InetAddress getHost()
+ {
+ return context.left;
+ }
+
+ public void close(boolean success)
+ {
+ if (!isClosed.compareAndSet(false, true))
+ {
+ logger.debug("Stream session {} already closed", getSessionId());
+ return;
+ }
+
+ closeInternal(success);
+
+ Gossiper.instance.unregister(this);
+ FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+
+ logger.debug("closing with status " + success);
+ if (callback != null)
+ {
+ if (success)
+ callback.onSuccess();
+ else
+ callback.onFailure();
+ }
+ }
+
+ protected abstract void closeInternal(boolean success);
+
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState epState)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void convict(InetAddress endpoint, double phi)
+ {
+ if (!endpoint.equals(getHost()))
+ return;
+
+ // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost.
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+ return;
+
+ logger.error("Stream failed because {} died or was restarted/removed (streams may still be active "
+ + "in background, but further streams won't be started)", endpoint);
+ close(false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd78edc/src/java/org/apache/cassandra/streaming/IStreamCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IStreamCallback.java b/src/java/org/apache/cassandra/streaming/IStreamCallback.java
new file mode 100644
index 0000000..f0d7754
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/IStreamCallback.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+/**
+ * Callback interface for streaming session success/failure.
+ */
+public interface IStreamCallback
+{
+ /**
+ * called when stream session is finished successfully.
+ */
+ public void onSuccess();
+
+ /**
+ * called when streaming somehow got in trouble.
+ */
+ public void onFailure();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd78edc/src/java/org/apache/cassandra/streaming/StreamIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamIn.java b/src/java/org/apache/cassandra/streaming/StreamIn.java
index ce6f7ef..0621086 100644
--- a/src/java/org/apache/cassandra/streaming/StreamIn.java
+++ b/src/java/org/apache/cassandra/streaming/StreamIn.java
@@ -48,7 +48,7 @@ public class StreamIn
private static Logger logger = LoggerFactory.getLogger(StreamIn.class);
/** Request ranges for all column families in the given keyspace. */
- public static void requestRanges(InetAddress source, String tableName, Collection<Range<Token>> ranges, Runnable callback, OperationType type)
+ public static void requestRanges(InetAddress source, String tableName, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
{
requestRanges(source, tableName, Table.open(tableName).getColumnFamilyStores(), ranges, callback, type);
}
@@ -56,7 +56,7 @@ public class StreamIn
/**
* Request ranges to be transferred from specific CFs
*/
- public static void requestRanges(InetAddress source, String tableName, Collection<ColumnFamilyStore> columnFamilies, Collection<Range<Token>> ranges, Runnable callback, OperationType type)
+ public static void requestRanges(InetAddress source, String tableName, Collection<ColumnFamilyStore> columnFamilies, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
{
assert ranges.size() > 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd78edc/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index 2e25436..e662a49 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -25,42 +25,40 @@ import java.net.Socket;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.MessagingService;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
-import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.OutboundTcpConnection;
import org.apache.cassandra.utils.Pair;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/** each context gets its own StreamInSession. So there may be >1 Session per host */
-public class StreamInSession
+public class StreamInSession extends AbstractStreamSession
{
private static final Logger logger = LoggerFactory.getLogger(StreamInSession.class);
private static ConcurrentMap<Pair<InetAddress, Long>, StreamInSession> sessions = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamInSession>();
private final Set<PendingFile> files = new NonBlockingHashSet<PendingFile>();
- private final Pair<InetAddress, Long> context;
- private final Runnable callback;
- private String table;
private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
private PendingFile current;
private Socket socket;
+ private volatile int retries;
- private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
+ private StreamInSession(Pair<InetAddress, Long> context, IStreamCallback callback)
{
- this.context = context;
- this.callback = callback;
+ super(null, context, callback);
}
- public static StreamInSession create(InetAddress host, Runnable callback)
+ public static StreamInSession create(InetAddress host, IStreamCallback callback)
{
Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, System.nanoTime());
StreamInSession session = new StreamInSession(context, callback);
@@ -76,9 +74,7 @@ public class StreamInSession
{
StreamInSession possibleNew = new StreamInSession(context, null);
if ((session = sessions.putIfAbsent(context, possibleNew)) == null)
- {
session = possibleNew;
- }
}
return session;
}
@@ -126,8 +122,16 @@ public class StreamInSession
public void retry(PendingFile remoteFile) throws IOException
{
+ retries++;
+ if (retries > DatabaseDescriptor.getMaxStreamingRetries())
+ {
+ logger.error(String.format("Failed streaming session %d from %s while receiving %s", getSessionId(), getHost().toString(), current),
+ new IllegalStateException("Too many retries for " + remoteFile));
+ closeInternal(false);
+ return;
+ }
StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_RETRY);
- logger.info("Streaming of file {} from {} failed: requesting a retry.", remoteFile, this);
+ logger.info("Streaming of file {} for {} failed: requesting a retry.", remoteFile, this);
sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost())));
}
@@ -136,7 +140,6 @@ public class StreamInSession
OutboundTcpConnection.write(message, String.valueOf(getSessionId()), new DataOutputStream(socket.getOutputStream()));
}
-
public void closeIfFinished() throws IOException
{
if (files.isEmpty())
@@ -151,7 +154,7 @@ public class StreamInSession
// Acquire the reference (for secondary index building) before submitting the index build,
// so it can't get compacted out of existence in between
if (!sstable.acquireReference())
- throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transfered");
+ throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred");
ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
cfs.addSSTable(sstable);
@@ -189,20 +192,25 @@ public class StreamInSession
socket.close();
}
- if (callback != null)
- callback.run();
- sessions.remove(context);
+ close(true);
}
}
- public long getSessionId()
+ protected void closeInternal(boolean success)
{
- return context.right;
- }
-
- public InetAddress getHost()
- {
- return context.left;
+ sessions.remove(context);
+ if (!success && FailureDetector.instance.isAlive(getHost()))
+ {
+ try
+ {
+ StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FAILURE);
+ MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost());
+ }
+ catch (IOException ex)
+ {
+ logger.error("Error sending streaming session failure notification to " + getHost(), ex);
+ }
+ }
}
/** query method to determine which hosts are streaming to this node. */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd78edc/src/java/org/apache/cassandra/streaming/StreamOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java
index 710c9ed..d3f37b5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOut.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOut.java
@@ -81,7 +81,7 @@ public class StreamOut
/**
* Stream the given ranges to the target endpoint from each CF in the given keyspace.
*/
- public static void transferRanges(InetAddress target, Table table, Collection<Range<Token>> ranges, Runnable callback, OperationType type)
+ public static void transferRanges(InetAddress target, Table table, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
{
StreamOutSession session = StreamOutSession.create(table.name, target, callback);
transferRanges(session, table.getColumnFamilyStores(), ranges, type);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd78edc/src/java/org/apache/cassandra/streaming/StreamOutSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOutSession.java b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
index 80629b8..7a53c7c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOutSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
@@ -22,29 +22,27 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.*;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.Pair;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
/**
* This class manages the streaming of multiple files one after the other.
-*/
-public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+ */
+public class StreamOutSession extends AbstractStreamSession
{
- private static final Logger logger = LoggerFactory.getLogger( StreamOutSession.class );
+ private static final Logger logger = LoggerFactory.getLogger(StreamOutSession.class);
// one host may have multiple stream sessions.
private static final ConcurrentMap<Pair<InetAddress, Long>, StreamOutSession> streams = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamOutSession>();
- public static StreamOutSession create(String table, InetAddress host, Runnable callback)
+ public static StreamOutSession create(String table, InetAddress host, IStreamCallback callback)
{
return create(table, host, System.nanoTime(), callback);
}
@@ -54,7 +52,7 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailur
return create(table, host, sessionId, null);
}
- public static StreamOutSession create(String table, InetAddress host, long sessionId, Runnable callback)
+ public static StreamOutSession create(String table, InetAddress host, long sessionId, IStreamCallback callback)
{
Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, sessionId);
StreamOutSession session = new StreamOutSession(table, context, callback);
@@ -69,29 +67,11 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailur
private final Map<String, PendingFile> files = new NonBlockingHashMap<String, PendingFile>();
- public final String table;
- private final Pair<InetAddress, Long> context;
- private final Runnable callback;
private volatile String currentFile;
- private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
- private StreamOutSession(String table, Pair<InetAddress, Long> context, Runnable callback)
- {
- this.table = table;
- this.context = context;
- this.callback = callback;
- Gossiper.instance.register(this);
- FailureDetector.instance.registerFailureDetectionEventListener(this);
- }
-
- public InetAddress getHost()
- {
- return context.left;
- }
- public long getSessionId()
+ private StreamOutSession(String table, Pair<InetAddress, Long> context, IStreamCallback callback)
{
- return context.right;
+ super(table, context, callback);
}
public void addFilesToStream(List<PendingFile> pendingFiles)
@@ -127,33 +107,12 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailur
streamFile(iter.next());
}
- public void close()
+ protected void closeInternal(boolean success)
{
- close(true);
- }
-
- private void close(boolean success)
- {
- // Though unlikely, it is possible for close to be called multiple
- // time, if the endpoint die at the exact wrong time for instance.
- if (!isClosed.compareAndSet(false, true))
- {
- logger.debug("StreamOutSession {} already closed", getSessionId());
- return;
- }
-
- Gossiper.instance.unregister(this);
- FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-
// Release reference on last file (or any uncompleted ones)
for (PendingFile file : files.values())
file.sstable.releaseReference();
streams.remove(context);
- // Instead of just not calling the callback on failure, we could have
- // allow to register a specific callback for failures, but we leave
- // that to a future ticket (likely CASSANDRA-3112)
- if (callback != null && success)
- callback.run();
}
/** convenience method for use when testing */
@@ -204,33 +163,4 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailur
logger.debug("Files are {}", StringUtils.join(files.values(), ","));
MessagingService.instance().stream(header, getHost());
}
-
- public void onJoin(InetAddress endpoint, EndpointState epState) {}
- public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
- public void onAlive(InetAddress endpoint, EndpointState state) {}
- public void onDead(InetAddress endpoint, EndpointState state) {}
-
- public void onRemove(InetAddress endpoint)
- {
- convict(endpoint, Double.MAX_VALUE);
- }
-
- public void onRestart(InetAddress endpoint, EndpointState epState)
- {
- convict(endpoint, Double.MAX_VALUE);
- }
-
- public void convict(InetAddress endpoint, double phi)
- {
- if (!endpoint.equals(getHost()))
- return;
-
- // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost.
- if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
- return;
-
- logger.error("StreamOutSession {} failed because {} died or was restarted/removed (streams may still be active "
- + "in background, but further streams won't be started)", endpoint);
- close(false);
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd78edc/src/java/org/apache/cassandra/streaming/StreamReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReply.java b/src/java/org/apache/cassandra/streaming/StreamReply.java
index f97cfee..c9b82a7 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReply.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReply.java
@@ -37,6 +37,7 @@ class StreamReply implements MessageProducer
FILE_FINISHED,
FILE_RETRY,
SESSION_FINISHED,
+ SESSION_FAILURE,
}
public static final IVersionedSerializer<StreamReply> serializer = new FileStatusSerializer();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd78edc/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
index ddc0690..e839fd5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
@@ -65,7 +65,10 @@ public class StreamReplyVerbHandler implements IVerbHandler
session.retry();
break;
case SESSION_FINISHED:
- session.close();
+ session.close(true);
+ break;
+ case SESSION_FAILURE:
+ session.close(false);
break;
default:
throw new RuntimeException("Cannot handle FileStatus.Action: " + reply.action);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5bd78edc/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
index ab341ed..af9a059 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
@@ -63,9 +63,9 @@ public class StreamingRepairTask implements Runnable
private final String tableName;
private final String cfName;
private final Collection<Range<Token>> ranges;
- private final Runnable callback;
+ private final IStreamCallback callback;
- private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, InetAddress dst, String tableName, String cfName, Collection<Range<Token>> ranges, Runnable callback)
+ private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, InetAddress dst, String tableName, String cfName, Collection<Range<Token>> ranges, IStreamCallback callback)
{
this.id = id;
this.owner = owner;
@@ -143,14 +143,14 @@ public class StreamingRepairTask implements Runnable
}
}
- private static Runnable makeReplyingCallback(final InetAddress taskOwner, final UUID taskId)
+ private static IStreamCallback makeReplyingCallback(final InetAddress taskOwner, final UUID taskId)
{
- return new Runnable()
+ return new IStreamCallback()
{
// we expect one callback for the receive, and one for the send
private final AtomicInteger outstanding = new AtomicInteger(2);
- public void run()
+ public void onSuccess()
{
if (outstanding.decrementAndGet() > 0)
// waiting on more calls
@@ -165,18 +165,20 @@ public class StreamingRepairTask implements Runnable
throw new IOError(e);
}
}
+
+ public void onFailure() {}
};
}
// wrap a given callback so as to unregister the streaming repair task on completion
- private static Runnable wrapCallback(final Runnable callback, final UUID taskid, final boolean isLocalTask)
+ private static IStreamCallback wrapCallback(final Runnable callback, final UUID taskid, final boolean isLocalTask)
{
- return new Runnable()
+ return new IStreamCallback()
{
// we expect one callback for the receive, and one for the send
private final AtomicInteger outstanding = new AtomicInteger(isLocalTask ? 2 : 1);
- public void run()
+ public void onSuccess()
{
if (outstanding.decrementAndGet() > 0)
// waiting on more calls
@@ -186,6 +188,8 @@ public class StreamingRepairTask implements Runnable
if (callback != null)
callback.run();
}
+
+ public void onFailure() {}
};
}
@@ -253,7 +257,7 @@ public class StreamingRepairTask implements Runnable
logger.info(String.format("[streaming task #%s] task succeeded", task.id));
if (task.callback != null)
- task.callback.run();
+ task.callback.onSuccess();
}
private static void reply(InetAddress remote, UUID taskid) throws IOException