You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/05/11 09:29:29 UTC

cassandra git commit: Only stream from unrepaired sstables during incremental repair

Repository: cassandra
Updated Branches:
  refs/heads/trunk 16bf51211 -> a5b90f15c


Only stream from unrepaired sstables during incremental repair

Patch by marcuse; reviewed by yukim for CASSANDRA-8267


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a5b90f15
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a5b90f15
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a5b90f15

Branch: refs/heads/trunk
Commit: a5b90f15c53e256bff4ad382745e34a739a5983a
Parents: 16bf512
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Dec 8 15:17:51 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon May 11 09:29:09 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                             |  1 +
 src/java/org/apache/cassandra/db/ColumnFamilyStore.java | 11 +++++++++--
 .../org/apache/cassandra/io/sstable/SSTableLoader.java  |  2 +-
 .../cassandra/net/IncomingStreamingConnection.java      |  2 +-
 src/java/org/apache/cassandra/repair/LocalSyncTask.java |  9 ++++++++-
 .../apache/cassandra/repair/StreamingRepairTask.java    |  9 ++++++++-
 .../apache/cassandra/streaming/ConnectionHandler.java   |  3 ++-
 .../apache/cassandra/streaming/StreamCoordinator.java   |  8 +++++---
 src/java/org/apache/cassandra/streaming/StreamPlan.java |  8 ++++----
 .../apache/cassandra/streaming/StreamResultFuture.java  |  9 +++++----
 .../org/apache/cassandra/streaming/StreamSession.java   | 12 ++++++++++--
 .../cassandra/streaming/messages/StreamInitMessage.java |  9 +++++++--
 .../org/apache/cassandra/dht/StreamStateStoreTest.java  |  4 ++--
 .../cassandra/streaming/StreamTransferTaskTest.java     |  2 +-
 14 files changed, 64 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f2f12c4..bff5970 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Only stream from unrepaired sstables with incremental repair (CASSANDRA-8267)
  * Aggregate UDFs allow SFUNC return type to differ from STYPE if FFUNC specified (CASSANDRA-9321)
  * Failure detector detects and ignores local pauses (CASSANDRA-9183)
  * Remove Thrift dependencies in bundled tools (CASSANDRA-8358)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 26a430a..fec3afc 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1816,7 +1816,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * @return a ViewFragment containing the sstables and memtables that may need to be merged
      * for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree.
      */
-    public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection)
+    public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired)
     {
         return new Function<DataTracker.View, List<SSTableReader>>()
         {
@@ -1824,8 +1824,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             {
                 Set<SSTableReader> sstables = Sets.newHashSet();
                 for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection)
-                    sstables.addAll(view.sstablesInBounds(rowBounds));
+                {
+                    for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
+                    {
+                        if (includeRepaired || !sstable.isRepaired())
+                            sstables.add(sstable);
+                    }
+                }
 
+                logger.debug("ViewFilter for {}/{} sstables", sstables.size(), getSSTables().size());
                 return ImmutableList.copyOf(sstables);
             }
         };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 6991958..910cdcc 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -156,7 +156,7 @@ public class SSTableLoader implements StreamEventHandler
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
 
-        StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false).connectionFactory(client.getConnectionFactory());
+        StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false).connectionFactory(client.getConnectionFactory());
 
         Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
         openSSTables(endpointToRanges);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index d00e4b8..274e47b 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -66,7 +66,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable
             // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
             // Note: we cannot use the same socket for incoming and outgoing streams because we want to
             // parallelize said streams and the socket is blocking, so we might deadlock.
-            StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version, init.keepSSTableLevel);
+            StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 010c959..daace01 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamEvent;
 import org.apache.cassandra.streaming.StreamEventHandler;
@@ -65,8 +66,14 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
 
         String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst);
         logger.info("[repair #{}] {}", desc.sessionId, message);
+        boolean isIncremental = false;
+        if (desc.parentSessionId != null)
+        {
+            ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
+            isIncremental = prs.isIncremental;
+        }
         Tracing.traceRepair(message);
-        new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
+        new StreamPlan("Repair", repairedAt, 1, false, isIncremental).listeners(this)
                                             .flushBeforeTransfer(true)
                                             // request ranges from the remote node
                                             .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index cbf0580..25ef06e 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.SyncComplete;
 import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.StreamEvent;
 import org.apache.cassandra.streaming.StreamEventHandler;
 import org.apache.cassandra.streaming.StreamPlan;
@@ -55,7 +56,13 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler
         InetAddress dest = request.dst;
         InetAddress preferred = SystemKeyspace.getPreferredIP(dest);
         logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, request.ranges.size(), request.dst));
-        new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
+        boolean isIncremental = false;
+        if (desc.parentSessionId != null)
+        {
+            ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
+            isIncremental = prs.isIncremental;
+        }
+        new StreamPlan("Repair", repairedAt, 1, false, isIncremental).listeners(this)
                                             .flushBeforeTransfer(true)
                                             // request ranges from the remote node
                                             .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 780018c..bb27972 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -183,7 +183,8 @@ public class ConnectionHandler
                     session.planId(),
                     session.description(),
                     isForOutgoing,
-                    session.keepSSTableLevel());
+                    session.keepSSTableLevel(),
+                    session.isIncremental());
             ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
             DataOutputStreamPlus out = getWriteChannel(socket);
             out.write(messageBuf);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 8d0cdce..603366d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -46,12 +46,14 @@ public class StreamCoordinator
     private final int connectionsPerHost;
     private StreamConnectionFactory factory;
     private final boolean keepSSTableLevel;
+    private final boolean isIncremental;
 
-    public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, StreamConnectionFactory factory)
+    public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, boolean isIncremental, StreamConnectionFactory factory)
     {
         this.connectionsPerHost = connectionsPerHost;
         this.factory = factory;
         this.keepSSTableLevel = keepSSTableLevel;
+        this.isIncremental = isIncremental;
     }
 
     public void setConnectionFactory(StreamConnectionFactory factory)
@@ -235,7 +237,7 @@ public class StreamCoordinator
             // create
             if (streamSessions.size() < connectionsPerHost)
             {
-                StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel);
+                StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, isIncremental);
                 streamSessions.put(++lastReturned, session);
                 return session;
             }
@@ -267,7 +269,7 @@ public class StreamCoordinator
             StreamSession session = streamSessions.get(id);
             if (session == null)
             {
-                session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel);
+                session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, isIncremental);
                 streamSessions.put(id, session);
             }
             return session;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 0e5cc6f..0d963ed 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -47,19 +47,19 @@ public class StreamPlan
      */
     public StreamPlan(String description)
     {
-        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false);
+        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false);
     }
 
     public StreamPlan(String description, boolean keepSSTableLevels)
     {
-        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels);
+        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false);
     }
 
-    public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels)
+    public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels, boolean isIncremental)
     {
         this.description = description;
         this.repairedAt = repairedAt;
-        this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, new DefaultConnectionFactory());
+        this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index ce9518a..99adab0 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -69,9 +69,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             set(getCurrentState());
     }
 
-    private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels)
+    private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels, boolean isIncremental)
     {
-        this(planId, description, new StreamCoordinator(0, keepSSTableLevels, new DefaultConnectionFactory()));
+        this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental, new DefaultConnectionFactory()));
     }
 
     static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners, StreamCoordinator coordinator)
@@ -102,7 +102,8 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
                                                                     Socket socket,
                                                                     boolean isForOutgoing,
                                                                     int version,
-                                                                    boolean keepSSTableLevel) throws IOException
+                                                                    boolean keepSSTableLevel,
+                                                                    boolean isIncremental) throws IOException
     {
         StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
         if (future == null)
@@ -110,7 +111,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description);
 
             // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
-            future = new StreamResultFuture(planId, description, keepSSTableLevel);
+            future = new StreamResultFuture(planId, description, keepSSTableLevel, isIncremental);
             StreamManager.instance.registerReceiving(future);
         }
         future.attachSocket(from, sessionIndex, socket, isForOutgoing, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 5a056c4..09ee3e4 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -145,6 +145,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
 
     private AtomicBoolean isAborted = new AtomicBoolean(false);
     private final boolean keepSSTableLevel;
+    private final boolean isIncremental;
 
     public static enum State
     {
@@ -166,7 +167,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      * @param connecting Actual connecting address
      * @param factory is used for establishing connection
      */
-    public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel)
+    public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, boolean isIncremental)
     {
         this.peer = peer;
         this.connecting = connecting;
@@ -175,6 +176,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         this.handler = new ConnectionHandler(this);
         this.metrics = StreamingMetrics.get(connecting);
         this.keepSSTableLevel = keepSSTableLevel;
+        this.isIncremental = isIncremental;
     }
 
     public UUID planId()
@@ -197,6 +199,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         return keepSSTableLevel;
     }
 
+    public boolean isIncremental()
+    {
+        return isIncremental;
+    }
+
+
     /**
      * Bind this session to report to specific {@link StreamResultFuture} and
      * perform pre-streaming initialization.
@@ -306,7 +314,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                 List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
                 for (Range<Token> range : ranges)
                     rowBoundsList.add(Range.makeRowRange(range));
-                refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList)).refs);
+                refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental)).refs);
             }
 
             List<SSTableStreamingSections> sections = new ArrayList<>(refs.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 03ac944..4928039 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -48,8 +48,9 @@ public class StreamInitMessage
     // true if this init message is to connect for outgoing message on receiving side
     public final boolean isForOutgoing;
     public final boolean keepSSTableLevel;
+    public final boolean isIncremental;
 
-    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel)
+    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel, boolean isIncremental)
     {
         this.from = from;
         this.sessionIndex = sessionIndex;
@@ -57,6 +58,7 @@ public class StreamInitMessage
         this.description = description;
         this.isForOutgoing = isForOutgoing;
         this.keepSSTableLevel = keepSSTableLevel;
+        this.isIncremental = isIncremental;
     }
 
     /**
@@ -109,6 +111,7 @@ public class StreamInitMessage
             out.writeUTF(message.description);
             out.writeBoolean(message.isForOutgoing);
             out.writeBoolean(message.keepSSTableLevel);
+            out.writeBoolean(message.isIncremental);
         }
 
         public StreamInitMessage deserialize(DataInput in, int version) throws IOException
@@ -119,7 +122,8 @@ public class StreamInitMessage
             String description = in.readUTF();
             boolean sentByInitiator = in.readBoolean();
             boolean keepSSTableLevel = in.readBoolean();
-            return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator, keepSSTableLevel);
+            boolean isIncremental = in.readBoolean();
+            return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator, keepSSTableLevel, isIncremental);
         }
 
         public long serializedSize(StreamInitMessage message, int version)
@@ -130,6 +134,7 @@ public class StreamInitMessage
             size += TypeSizes.NATIVE.sizeof(message.description);
             size += TypeSizes.NATIVE.sizeof(message.isForOutgoing);
             size += TypeSizes.NATIVE.sizeof(message.keepSSTableLevel);
+            size += TypeSizes.NATIVE.sizeof(message.isIncremental);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
index c8b9f05..86781d9 100644
--- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
+++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
@@ -42,7 +42,7 @@ public class StreamStateStoreTest
         Range<Token> range = new Range<>(factory.fromString("0"), factory.fromString("100"));
 
         InetAddress local = FBUtilities.getBroadcastAddress();
-        StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true);
+        StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, false);
         session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf"), 0);
 
         StreamStateStore store = new StreamStateStore();
@@ -63,7 +63,7 @@ public class StreamStateStoreTest
 
         // add different range within the same keyspace
         Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200"));
-        session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true);
+        session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, false);
         session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf"), 0);
         session.state(StreamSession.State.COMPLETE);
         store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 87b9c38..a0ac870 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -64,7 +64,7 @@ public class StreamTransferTaskTest
         String cf = "Standard1";
 
         InetAddress peer = FBUtilities.getBroadcastAddress();
-        StreamSession session = new StreamSession(peer, peer, null, 0, true);
+        StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
         ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
 
         // create two sstables