You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/05/11 09:29:29 UTC
cassandra git commit: Only stream from unrepaired sstables during
incremental repair
Repository: cassandra
Updated Branches:
refs/heads/trunk 16bf51211 -> a5b90f15c
Only stream from unrepaired sstables during incremental repair
Patch by marcuse; reviewed by yukim for CASSANDRA-8267
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a5b90f15
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a5b90f15
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a5b90f15
Branch: refs/heads/trunk
Commit: a5b90f15c53e256bff4ad382745e34a739a5983a
Parents: 16bf512
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Dec 8 15:17:51 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon May 11 09:29:09 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/ColumnFamilyStore.java | 11 +++++++++--
.../org/apache/cassandra/io/sstable/SSTableLoader.java | 2 +-
.../cassandra/net/IncomingStreamingConnection.java | 2 +-
src/java/org/apache/cassandra/repair/LocalSyncTask.java | 9 ++++++++-
.../apache/cassandra/repair/StreamingRepairTask.java | 9 ++++++++-
.../apache/cassandra/streaming/ConnectionHandler.java | 3 ++-
.../apache/cassandra/streaming/StreamCoordinator.java | 8 +++++---
src/java/org/apache/cassandra/streaming/StreamPlan.java | 8 ++++----
.../apache/cassandra/streaming/StreamResultFuture.java | 9 +++++----
.../org/apache/cassandra/streaming/StreamSession.java | 12 ++++++++++--
.../cassandra/streaming/messages/StreamInitMessage.java | 9 +++++++--
.../org/apache/cassandra/dht/StreamStateStoreTest.java | 4 ++--
.../cassandra/streaming/StreamTransferTaskTest.java | 2 +-
14 files changed, 64 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f2f12c4..bff5970 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0
+ * Only stream from unrepaired sstables with incremental repair (CASSANDRA-8267)
* Aggregate UDFs allow SFUNC return type to differ from STYPE if FFUNC specified (CASSANDRA-9321)
* Failure detector detects and ignores local pauses (CASSANDRA-9183)
* Remove Thrift dependencies in bundled tools (CASSANDRA-8358)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 26a430a..fec3afc 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1816,7 +1816,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* @return a ViewFragment containing the sstables and memtables that may need to be merged
* for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree.
*/
- public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection)
+ public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired)
{
return new Function<DataTracker.View, List<SSTableReader>>()
{
@@ -1824,8 +1824,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
Set<SSTableReader> sstables = Sets.newHashSet();
for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection)
- sstables.addAll(view.sstablesInBounds(rowBounds));
+ {
+ for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
+ {
+ if (includeRepaired || !sstable.isRepaired())
+ sstables.add(sstable);
+ }
+ }
+ logger.debug("ViewFilter for {}/{} sstables", sstables.size(), getSSTables().size());
return ImmutableList.copyOf(sstables);
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 6991958..910cdcc 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -156,7 +156,7 @@ public class SSTableLoader implements StreamEventHandler
client.init(keyspace);
outputHandler.output("Established connection to initial hosts");
- StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false).connectionFactory(client.getConnectionFactory());
+ StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false).connectionFactory(client.getConnectionFactory());
Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
openSSTables(endpointToRanges);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index d00e4b8..274e47b 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -66,7 +66,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable
// The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
// Note: we cannot use the same socket for incoming and outgoing streams because we want to
// parallelize said streams and the socket is blocking, so we might deadlock.
- StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version, init.keepSSTableLevel);
+ StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 010c959..daace01 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamEventHandler;
@@ -65,8 +66,14 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst);
logger.info("[repair #{}] {}", desc.sessionId, message);
+ boolean isIncremental = false;
+ if (desc.parentSessionId != null)
+ {
+ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
+ isIncremental = prs.isIncremental;
+ }
Tracing.traceRepair(message);
- new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
+ new StreamPlan("Repair", repairedAt, 1, false, isIncremental).listeners(this)
.flushBeforeTransfer(true)
// request ranges from the remote node
.requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index cbf0580..25ef06e 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.SyncComplete;
import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamEventHandler;
import org.apache.cassandra.streaming.StreamPlan;
@@ -55,7 +56,13 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
InetAddress dest = request.dst;
InetAddress preferred = SystemKeyspace.getPreferredIP(dest);
logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, request.ranges.size(), request.dst));
- new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
+ boolean isIncremental = false;
+ if (desc.parentSessionId != null)
+ {
+ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
+ isIncremental = prs.isIncremental;
+ }
+ new StreamPlan("Repair", repairedAt, 1, false, isIncremental).listeners(this)
.flushBeforeTransfer(true)
// request ranges from the remote node
.requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 780018c..bb27972 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -183,7 +183,8 @@ public class ConnectionHandler
session.planId(),
session.description(),
isForOutgoing,
- session.keepSSTableLevel());
+ session.keepSSTableLevel(),
+ session.isIncremental());
ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
DataOutputStreamPlus out = getWriteChannel(socket);
out.write(messageBuf);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 8d0cdce..603366d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -46,12 +46,14 @@ public class StreamCoordinator
private final int connectionsPerHost;
private StreamConnectionFactory factory;
private final boolean keepSSTableLevel;
+ private final boolean isIncremental;
- public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, StreamConnectionFactory factory)
+ public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental, StreamConnectionFactory factory)
{
this.connectionsPerHost = connectionsPerHost;
this.factory = factory;
this.keepSSTableLevel = keepSSTableLevel;
+ this.isIncremental = isIncremental;
}
public void setConnectionFactory(StreamConnectionFactory factory)
@@ -235,7 +237,7 @@ public class StreamCoordinator
// create
if (streamSessions.size() < connectionsPerHost)
{
- StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel);
+ StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, isIncremental);
streamSessions.put(++lastReturned, session);
return session;
}
@@ -267,7 +269,7 @@ public class StreamCoordinator
StreamSession session = streamSessions.get(id);
if (session == null)
{
- session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel);
+ session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, isIncremental);
streamSessions.put(id, session);
}
return session;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 0e5cc6f..0d963ed 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -47,19 +47,19 @@ public class StreamPlan
*/
public StreamPlan(String description)
{
- this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false);
+ this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false);
}
public StreamPlan(String description, boolean keepSSTableLevels)
{
- this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels);
+ this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false);
}
- public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels)
+ public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels, boolean isIncremental)
{
this.description = description;
this.repairedAt = repairedAt;
- this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, new DefaultConnectionFactory());
+ this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory());
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index ce9518a..99adab0 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -69,9 +69,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
set(getCurrentState());
}
- private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels)
+ private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels, boolean isIncremental)
{
- this(planId, description, new StreamCoordinator(0, keepSSTableLevels, new DefaultConnectionFactory()));
+ this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental, new DefaultConnectionFactory()));
}
static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners, StreamCoordinator coordinator)
@@ -102,7 +102,8 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
Socket socket,
boolean isForOutgoing,
int version,
- boolean keepSSTableLevel) throws IOException
+ boolean keepSSTableLevel,
+ boolean isIncremental) throws IOException
{
StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
if (future == null)
@@ -110,7 +111,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description);
// The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
- future = new StreamResultFuture(planId, description, keepSSTableLevel);
+ future = new StreamResultFuture(planId, description, keepSSTableLevel, isIncremental);
StreamManager.instance.registerReceiving(future);
}
future.attachSocket(from, sessionIndex, socket, isForOutgoing, version);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 5a056c4..09ee3e4 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -145,6 +145,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
private AtomicBoolean isAborted = new AtomicBoolean(false);
private final boolean keepSSTableLevel;
+ private final boolean isIncremental;
public static enum State
{
@@ -166,7 +167,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
* @param connecting Actual connecting address
* @param factory is used for establishing connection
*/
- public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel)
+ public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, boolean isIncremental)
{
this.peer = peer;
this.connecting = connecting;
@@ -175,6 +176,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
this.handler = new ConnectionHandler(this);
this.metrics = StreamingMetrics.get(connecting);
this.keepSSTableLevel = keepSSTableLevel;
+ this.isIncremental = isIncremental;
}
public UUID planId()
@@ -197,6 +199,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber
return keepSSTableLevel;
}
+ public boolean isIncremental()
+ {
+ return isIncremental;
+ }
+
+
/**
* Bind this session to report to specific {@link StreamResultFuture} and
* perform pre-streaming initialization.
@@ -306,7 +314,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
for (Range<Token> range : ranges)
rowBoundsList.add(Range.makeRowRange(range));
- refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList)).refs);
+ refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental)).refs);
}
List<SSTableStreamingSections> sections = new ArrayList<>(refs.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 03ac944..4928039 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -48,8 +48,9 @@ public class StreamInitMessage
// true if this init message is to connect for outgoing message on receiving side
public final boolean isForOutgoing;
public final boolean keepSSTableLevel;
+ public final boolean isIncremental;
- public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel)
+ public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel, boolean isIncremental)
{
this.from = from;
this.sessionIndex = sessionIndex;
@@ -57,6 +58,7 @@ public class StreamInitMessage
this.description = description;
this.isForOutgoing = isForOutgoing;
this.keepSSTableLevel = keepSSTableLevel;
+ this.isIncremental = isIncremental;
}
/**
@@ -109,6 +111,7 @@ public class StreamInitMessage
out.writeUTF(message.description);
out.writeBoolean(message.isForOutgoing);
out.writeBoolean(message.keepSSTableLevel);
+ out.writeBoolean(message.isIncremental);
}
public StreamInitMessage deserialize(DataInput in, int version) throws IOException
@@ -119,7 +122,8 @@ public class StreamInitMessage
String description = in.readUTF();
boolean sentByInitiator = in.readBoolean();
boolean keepSSTableLevel = in.readBoolean();
- return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator, keepSSTableLevel);
+ boolean isIncremental = in.readBoolean();
+ return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator, keepSSTableLevel, isIncremental);
}
public long serializedSize(StreamInitMessage message, int version)
@@ -130,6 +134,7 @@ public class StreamInitMessage
size += TypeSizes.NATIVE.sizeof(message.description);
size += TypeSizes.NATIVE.sizeof(message.isForOutgoing);
size += TypeSizes.NATIVE.sizeof(message.keepSSTableLevel);
+ size += TypeSizes.NATIVE.sizeof(message.isIncremental);
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
index c8b9f05..86781d9 100644
--- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
+++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
@@ -42,7 +42,7 @@ public class StreamStateStoreTest
Range<Token> range = new Range<>(factory.fromString("0"), factory.fromString("100"));
InetAddress local = FBUtilities.getBroadcastAddress();
- StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true);
+ StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, false);
session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf"), 0);
StreamStateStore store = new StreamStateStore();
@@ -63,7 +63,7 @@ public class StreamStateStoreTest
// add different range within the same keyspace
Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200"));
- session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true);
+ session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, false);
session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf"), 0);
session.state(StreamSession.State.COMPLETE);
store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 87b9c38..a0ac870 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -64,7 +64,7 @@ public class StreamTransferTaskTest
String cf = "Standard1";
InetAddress peer = FBUtilities.getBroadcastAddress();
- StreamSession session = new StreamSession(peer, peer, null, 0, true);
+ StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
// create two sstables