You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2018/03/15 23:42:50 UTC

[2/5] cassandra git commit: Abstract streaming for pluggable storage

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
deleted file mode 100644
index 13a3358..0000000
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.messages;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.CompactEndpointSerializationHelper;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.compress.CompressionInfo;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.UUIDSerializer;
-
-/**
- * StreamingFileHeader is appended before sending actual data to describe what it's sending.
- */
-public class FileMessageHeader
-{
-    public static FileMessageHeaderSerializer serializer = new FileMessageHeaderSerializer();
-
-    public final TableId tableId;
-    public UUID planId;
-    public int sessionIndex;
-    public final int sequenceNumber;
-    /** SSTable version */
-    public final Version version;
-
-    /** SSTable format **/
-    public final SSTableFormat.Type format;
-    public final long estimatedKeys;
-    public final List<Pair<Long, Long>> sections;
-    /**
-     * Compression info for SSTable to send. Can be null if SSTable is not compressed.
-     * On sender, this field is always null to avoid holding large number of Chunks.
-     * Use compressionMetadata instead.
-     */
-    public final CompressionInfo compressionInfo;
-    private final CompressionMetadata compressionMetadata;
-    public final long repairedAt;
-    public final UUID pendingRepair;
-    public final int sstableLevel;
-    public final SerializationHeader.Component header;
-    public final InetAddressAndPort sender;
-
-    /* cached size value */
-    private transient final long size;
-
-    private FileMessageHeader(TableId tableId,
-                             InetAddressAndPort sender,
-                             UUID planId,
-                             int sessionIndex,
-                             int sequenceNumber,
-                             Version version,
-                             SSTableFormat.Type format,
-                             long estimatedKeys,
-                             List<Pair<Long, Long>> sections,
-                             CompressionInfo compressionInfo,
-                             long repairedAt,
-                             UUID pendingRepair,
-                             int sstableLevel,
-                             SerializationHeader.Component header)
-    {
-        this.tableId = tableId;
-        this.sender = sender;
-        this.planId = planId;
-        this.sessionIndex = sessionIndex;
-        this.sequenceNumber = sequenceNumber;
-        this.version = version;
-        this.format = format;
-        this.estimatedKeys = estimatedKeys;
-        this.sections = sections;
-        this.compressionInfo = compressionInfo;
-        this.compressionMetadata = null;
-        this.repairedAt = repairedAt;
-        this.pendingRepair = pendingRepair;
-        this.sstableLevel = sstableLevel;
-        this.header = header;
-        this.size = calculateSize();
-    }
-
-    public FileMessageHeader(TableId tableId,
-                             InetAddressAndPort sender,
-                             UUID planId,
-                             int sessionIndex,
-                             int sequenceNumber,
-                             Version version,
-                             SSTableFormat.Type format,
-                             long estimatedKeys,
-                             List<Pair<Long, Long>> sections,
-                             CompressionMetadata compressionMetadata,
-                             long repairedAt,
-                             UUID pendingRepair,
-                             int sstableLevel,
-                             SerializationHeader.Component header)
-    {
-        this.tableId = tableId;
-        this.sender = sender;
-        this.planId = planId;
-        this.sessionIndex = sessionIndex;
-        this.sequenceNumber = sequenceNumber;
-        this.version = version;
-        this.format = format;
-        this.estimatedKeys = estimatedKeys;
-        this.sections = sections;
-        this.compressionInfo = null;
-        this.compressionMetadata = compressionMetadata;
-        this.repairedAt = repairedAt;
-        this.pendingRepair = pendingRepair;
-        this.sstableLevel = sstableLevel;
-        this.header = header;
-        this.size = calculateSize();
-    }
-
-    public boolean isCompressed()
-    {
-        return compressionInfo != null || compressionMetadata != null;
-    }
-
-    /**
-     * @return total file size to transfer in bytes
-     */
-    public long size()
-    {
-        return size;
-    }
-
-    private long calculateSize()
-    {
-        long transferSize = 0;
-        if (compressionInfo != null)
-        {
-            // calculate total length of transferring chunks
-            for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-                transferSize += chunk.length + 4; // 4 bytes for CRC
-        }
-        else if (compressionMetadata != null)
-        {
-            transferSize = compressionMetadata.getTotalSizeForSections(sections);
-        }
-        else
-        {
-            for (Pair<Long, Long> section : sections)
-                transferSize += section.right - section.left;
-        }
-        return transferSize;
-    }
-
-    @Override
-    public String toString()
-    {
-        final StringBuilder sb = new StringBuilder("Header (");
-        sb.append("tableId: ").append(tableId);
-        sb.append(", #").append(sequenceNumber);
-        sb.append(", version: ").append(version);
-        sb.append(", format: ").append(format);
-        sb.append(", estimated keys: ").append(estimatedKeys);
-        sb.append(", transfer size: ").append(size());
-        sb.append(", compressed?: ").append(isCompressed());
-        sb.append(", repairedAt: ").append(repairedAt);
-        sb.append(", pendingRepair: ").append(pendingRepair);
-        sb.append(", level: ").append(sstableLevel);
-        sb.append(')');
-        return sb.toString();
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        FileMessageHeader that = (FileMessageHeader) o;
-        return sequenceNumber == that.sequenceNumber && tableId.equals(that.tableId);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        int result = tableId.hashCode();
-        result = 31 * result + sequenceNumber;
-        return result;
-    }
-
-    public void addSessionInfo(StreamSession session)
-    {
-        planId = session.planId();
-        sessionIndex = session.sessionIndex();
-    }
-
-    static class FileMessageHeaderSerializer
-    {
-        public CompressionInfo serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException
-        {
-            header.tableId.serialize(out);
-            CompactEndpointSerializationHelper.streamingInstance.serialize(header.sender, out, version);
-            UUIDSerializer.serializer.serialize(header.planId, out, version);
-            out.writeInt(header.sessionIndex);
-            out.writeInt(header.sequenceNumber);
-            out.writeUTF(header.version.toString());
-            out.writeUTF(header.format.name);
-
-            out.writeLong(header.estimatedKeys);
-            out.writeInt(header.sections.size());
-            for (Pair<Long, Long> section : header.sections)
-            {
-                out.writeLong(section.left);
-                out.writeLong(section.right);
-            }
-            // construct CompressionInfo here to avoid holding large number of Chunks on heap.
-            CompressionInfo compressionInfo = null;
-            if (header.compressionMetadata != null)
-                compressionInfo = new CompressionInfo(header.compressionMetadata.getChunksForSections(header.sections), header.compressionMetadata.parameters);
-            CompressionInfo.serializer.serialize(compressionInfo, out, version);
-            out.writeLong(header.repairedAt);
-            out.writeBoolean(header.pendingRepair != null);
-            if (header.pendingRepair != null)
-            {
-                UUIDSerializer.serializer.serialize(header.pendingRepair, out, version);
-            }
-            out.writeInt(header.sstableLevel);
-
-            SerializationHeader.serializer.serialize(header.version, header.header, out);
-            return compressionInfo;
-        }
-
-        public FileMessageHeader deserialize(DataInputPlus in, int version) throws IOException
-        {
-            TableId tableId = TableId.deserialize(in);
-            InetAddressAndPort sender = CompactEndpointSerializationHelper.streamingInstance.deserialize(in, version);
-            UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
-            int sessionIndex = in.readInt();
-            int sequenceNumber = in.readInt();
-            Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF());
-            SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF());
-
-            long estimatedKeys = in.readLong();
-            int count = in.readInt();
-            List<Pair<Long, Long>> sections = new ArrayList<>(count);
-            for (int k = 0; k < count; k++)
-                sections.add(Pair.create(in.readLong(), in.readLong()));
-            CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, version);
-            long repairedAt = in.readLong();
-            UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null;
-            int sstableLevel = in.readInt();
-            SerializationHeader.Component header =  SerializationHeader.serializer.deserialize(sstableVersion, in);
-
-            return new FileMessageHeader(tableId, sender, planId, sessionIndex, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, pendingRepair, sstableLevel, header);
-        }
-
-        public long serializedSize(FileMessageHeader header, int version)
-        {
-            long size = header.tableId.serializedSize();
-            size += CompactEndpointSerializationHelper.streamingInstance.serializedSize(header.sender, version);
-            size += UUIDSerializer.serializer.serializedSize(header.planId, version);
-            size += TypeSizes.sizeof(header.sessionIndex);
-            size += TypeSizes.sizeof(header.sequenceNumber);
-            size += TypeSizes.sizeof(header.version.toString());
-            size += TypeSizes.sizeof(header.format.name);
-            size += TypeSizes.sizeof(header.estimatedKeys);
-
-            size += TypeSizes.sizeof(header.sections.size());
-            for (Pair<Long, Long> section : header.sections)
-            {
-                size += TypeSizes.sizeof(section.left);
-                size += TypeSizes.sizeof(section.right);
-            }
-            size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
-            size += TypeSizes.sizeof(header.repairedAt);
-            size += TypeSizes.sizeof(header.pendingRepair != null);
-            size += header.pendingRepair != null ? UUIDSerializer.serializer.serializedSize(header.pendingRepair, version) : 0;
-            size += TypeSizes.sizeof(header.sstableLevel);
-
-            size += SerializationHeader.serializer.serializedSize(header.version, header.header);
-
-            return size;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
deleted file mode 100644
index 9f43982..0000000
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.messages;
-
-import java.io.IOException;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.util.DataInputPlus;
-
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.streaming.StreamManager;
-import org.apache.cassandra.streaming.StreamReader;
-import org.apache.cassandra.streaming.StreamReceiveException;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.compress.CompressedStreamReader;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-
-
-/**
- * IncomingFileMessage is used to receive the part(or whole) of a SSTable data file.
- */
-public class IncomingFileMessage extends StreamMessage
-{
-    public static Serializer<IncomingFileMessage> serializer = new Serializer<IncomingFileMessage>()
-    {
-        @SuppressWarnings("resource")
-        public IncomingFileMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException
-        {
-            FileMessageHeader header = FileMessageHeader.serializer.deserialize(input, version);
-            session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex);
-            if (session == null)
-                throw new IllegalStateException(String.format("unknown stream session: %s - %d", header.planId, header.sessionIndex));
-            ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(header.tableId);
-            if (cfs == null)
-                throw new StreamReceiveException(session, "CF " + header.tableId + " was dropped during streaming");
-
-            StreamReader reader = !header.isCompressed() ? new StreamReader(header, session)
-                                                         : new CompressedStreamReader(header, session);
-
-            try
-            {
-                return new IncomingFileMessage(reader.read(input), header);
-            }
-            catch (Throwable t)
-            {
-                JVMStabilityInspector.inspectThrowable(t);
-                throw new StreamReceiveException(session, t);
-            }
-        }
-
-        public void serialize(IncomingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session)
-        {
-            throw new UnsupportedOperationException("Not allowed to call serialize on an incoming file");
-        }
-
-        public long serializedSize(IncomingFileMessage message, int version)
-        {
-            throw new UnsupportedOperationException("Not allowed to call serializedSize on an incoming file");
-        }
-    };
-
-    public FileMessageHeader header;
-    public SSTableMultiWriter sstable;
-
-    public IncomingFileMessage(SSTableMultiWriter sstable, FileMessageHeader header)
-    {
-        super(Type.FILE);
-        this.header = header;
-        this.sstable = sstable;
-    }
-
-    @Override
-    public String toString()
-    {
-        String filename = sstable != null ? sstable.getFilename() : null;
-        return "File (" + header + ", file: " + filename + ")";
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
new file mode 100644
index 0000000..e17c3ab
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.util.DataInputPlus;
+
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.streaming.IncomingStream;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamReceiveException;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+
+public class IncomingStreamMessage extends StreamMessage
+{
+    public static Serializer<IncomingStreamMessage> serializer = new Serializer<IncomingStreamMessage>()
+    {
+        @SuppressWarnings("resource")
+        public IncomingStreamMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException
+        {
+            StreamMessageHeader header = StreamMessageHeader.serializer.deserialize(input, version);
+            session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex);
+            if (session == null)
+                throw new IllegalStateException(String.format("unknown stream session: %s - %d", header.planId, header.sessionIndex));
+            ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(header.tableId);
+            if (cfs == null)
+                throw new StreamReceiveException(session, "CF " + header.tableId + " was dropped during streaming");
+
+            IncomingStream incomingData = cfs.getStreamManager().prepareIncomingStream(session, header);
+            incomingData.read(input, version);
+
+            try
+            {
+                return new IncomingStreamMessage(incomingData, header);
+            }
+            catch (Throwable t)
+            {
+                JVMStabilityInspector.inspectThrowable(t);
+                throw new StreamReceiveException(session, t);
+            }
+        }
+
+        public void serialize(IncomingStreamMessage message, DataOutputStreamPlus out, int version, StreamSession session)
+        {
+            throw new UnsupportedOperationException("Not allowed to call serialize on an incoming stream");
+        }
+
+        public long serializedSize(IncomingStreamMessage message, int version)
+        {
+            throw new UnsupportedOperationException("Not allowed to call serializedSize on an incoming stream");
+        }
+    };
+
+    public StreamMessageHeader header;
+    public IncomingStream stream;
+
+    public IncomingStreamMessage(IncomingStream stream, StreamMessageHeader header)
+    {
+        super(Type.STREAM);
+        this.stream = stream;
+        this.header = header;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "IncomingStreamMessage{" +
+               "header=" + header +
+               ", stream=" + stream +
+               '}';
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        IncomingStreamMessage that = (IncomingStreamMessage) o;
+        return Objects.equals(header, that.header) &&
+               Objects.equals(stream, that.stream);
+    }
+
+    public int hashCode()
+    {
+
+        return Objects.hash(header, stream);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
deleted file mode 100644
index 8bbcc05..0000000
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.messages;
-
-import java.io.IOException;
-import java.util.List;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.StreamWriter;
-import org.apache.cassandra.streaming.compress.CompressedStreamWriter;
-import org.apache.cassandra.streaming.compress.CompressionInfo;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.concurrent.Ref;
-
-/**
- * OutgoingFileMessage is used to transfer the part(or whole) of a SSTable data file.
- */
-public class OutgoingFileMessage extends StreamMessage
-{
-    public static Serializer<OutgoingFileMessage> serializer = new Serializer<OutgoingFileMessage>()
-    {
-        public OutgoingFileMessage deserialize(DataInputPlus in, int version, StreamSession session)
-        {
-            throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing file");
-        }
-
-        public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
-        {
-            message.startTransfer();
-            try
-            {
-                message.serialize(out, version, session);
-                session.fileSent(message.header);
-            }
-            finally
-            {
-                message.finishTransfer();
-            }
-        }
-
-        public long serializedSize(OutgoingFileMessage message, int version)
-        {
-            return 0;
-        }
-    };
-
-    public final FileMessageHeader header;
-    private final Ref<SSTableReader> ref;
-    private final String filename;
-    private boolean completed = false;
-    private boolean transferring = false;
-
-    public OutgoingFileMessage(Ref<SSTableReader> ref, StreamSession session, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, boolean keepSSTableLevel)
-    {
-        super(Type.FILE);
-        this.ref = ref;
-
-        SSTableReader sstable = ref.get();
-        filename = sstable.getFilename();
-        this.header = new FileMessageHeader(sstable.metadata().id,
-                                            FBUtilities.getBroadcastAddressAndPort(),
-                                            session.planId(),
-                                            session.sessionIndex(),
-                                            sequenceNumber,
-                                            sstable.descriptor.version,
-                                            sstable.descriptor.formatType,
-                                            estimatedKeys,
-                                            sections,
-                                            sstable.compression ? sstable.getCompressionMetadata() : null,
-                                            sstable.getRepairedAt(),
-                                            sstable.getPendingRepair(),
-                                            keepSSTableLevel ? sstable.getSSTableLevel() : 0,
-                                            sstable.header.toComponent());
-    }
-
-    public synchronized void serialize(DataOutputStreamPlus out, int version, StreamSession session) throws IOException
-    {
-        if (completed)
-        {
-            return;
-        }
-
-        CompressionInfo compressionInfo = FileMessageHeader.serializer.serialize(header, out, version);
-        out.flush();
-        final SSTableReader reader = ref.get();
-        StreamWriter writer = compressionInfo == null ?
-                              new StreamWriter(reader, header.sections, session) :
-                              new CompressedStreamWriter(reader, header.sections,
-                                                         compressionInfo, session);
-        writer.write(out);
-    }
-
-    @VisibleForTesting
-    public synchronized void finishTransfer()
-    {
-        transferring = false;
-        //session was aborted mid-transfer, now it's safe to release
-        if (completed)
-        {
-            ref.release();
-        }
-    }
-
-    @VisibleForTesting
-    public synchronized void startTransfer()
-    {
-        if (completed)
-            throw new RuntimeException(String.format("Transfer of file %s already completed or aborted (perhaps session failed?).",
-                                                     filename));
-        transferring = true;
-    }
-
-    public synchronized void complete()
-    {
-        if (!completed)
-        {
-            completed = true;
-            //release only if not transferring
-            if (!transferring)
-            {
-                ref.release();
-            }
-        }
-    }
-
-    @Override
-    public String toString()
-    {
-        return "File (" + header + ", file: " + filename + ")";
-    }
-
-    public String getFilename()
-    {
-        return filename;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
new file mode 100644
index 0000000..263aabd
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.OutgoingStream;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class OutgoingStreamMessage extends StreamMessage
+{
+    public static Serializer<OutgoingStreamMessage> serializer = new Serializer<OutgoingStreamMessage>()
+    {
+        public OutgoingStreamMessage deserialize(DataInputPlus in, int version, StreamSession session)
+        {
+            throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing stream");
+        }
+
+        public void serialize(OutgoingStreamMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
+        {
+            message.startTransfer();
+            try
+            {
+                message.serialize(out, version, session);
+                session.streamSent(message);
+            }
+            finally
+            {
+                message.finishTransfer();
+            }
+        }
+
+        public long serializedSize(OutgoingStreamMessage message, int version)
+        {
+            return 0;
+        }
+    };
+
+    public final StreamMessageHeader header;
+    private final TableId tableId;
+    public final OutgoingStream stream;
+    private boolean completed = false;
+    private boolean transferring = false;
+
+    public OutgoingStreamMessage(TableId tableId, StreamSession session, OutgoingStream stream, int sequenceNumber)
+    {
+        super(Type.STREAM);
+        this.tableId = tableId;
+
+        this.stream = stream;
+        this.header = new StreamMessageHeader(tableId,
+                                              FBUtilities.getBroadcastAddressAndPort(),
+                                              session.planId(),
+                                              session.sessionIndex(),
+                                              sequenceNumber,
+                                              stream.getRepairedAt(),
+                                              stream.getPendingRepair());
+    }
+
+    public synchronized void serialize(DataOutputStreamPlus out, int version, StreamSession session) throws IOException
+    {
+        if (completed)
+        {
+            return;
+        }
+        StreamMessageHeader.serializer.serialize(header, out, version);
+        stream.write(session, out, version);
+    }
+
+    @VisibleForTesting
+    public synchronized void finishTransfer()
+    {
+        transferring = false;
+        //session was aborted mid-transfer, now it's safe to release
+        if (completed)
+        {
+            stream.finish();
+        }
+    }
+
+    @VisibleForTesting
+    public synchronized void startTransfer()
+    {
+        if (completed)
+            throw new RuntimeException(String.format("Transfer of stream %s already completed or aborted (perhaps session failed?).",
+                                                     stream));
+        transferring = true;
+    }
+
+    public synchronized void complete()
+    {
+        if (!completed)
+        {
+            completed = true;
+            //release only if not transferring
+            if (!transferring)
+            {
+                stream.finish();
+            }
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return "OutgoingStreamMessage{" +
+               "header=" + header +
+               ", stream=" + stream +
+               '}';
+    }
+
+    public String getName()
+    {
+        return stream.getName();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index fced133..fbd3e21 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -44,18 +44,16 @@ public class StreamInitMessage extends StreamMessage
     public final UUID planId;
     public final StreamOperation streamOperation;
 
-    public final boolean keepSSTableLevel;
     public final UUID pendingRepair;
     public final PreviewKind previewKind;
 
-    public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
+    public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID planId, StreamOperation streamOperation, UUID pendingRepair, PreviewKind previewKind)
     {
         super(Type.STREAM_INIT);
         this.from = from;
         this.sessionIndex = sessionIndex;
         this.planId = planId;
         this.streamOperation = streamOperation;
-        this.keepSSTableLevel = keepSSTableLevel;
         this.pendingRepair = pendingRepair;
         this.previewKind = previewKind;
     }
@@ -77,7 +75,6 @@ public class StreamInitMessage extends StreamMessage
             out.writeInt(message.sessionIndex);
             UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version);
             out.writeUTF(message.streamOperation.getDescription());
-            out.writeBoolean(message.keepSSTableLevel);
 
             out.writeBoolean(message.pendingRepair != null);
             if (message.pendingRepair != null)
@@ -93,11 +90,10 @@ public class StreamInitMessage extends StreamMessage
             int sessionIndex = in.readInt();
             UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
             String description = in.readUTF();
-            boolean keepSSTableLevel = in.readBoolean();
 
             UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null;
             PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
-            return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), keepSSTableLevel, pendingRepair, previewKind);
+            return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), pendingRepair, previewKind);
         }
 
         public long serializedSize(StreamInitMessage message, int version)
@@ -106,7 +102,6 @@ public class StreamInitMessage extends StreamMessage
             size += TypeSizes.sizeof(message.sessionIndex);
             size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version);
             size += TypeSizes.sizeof(message.streamOperation.getDescription());
-            size += TypeSizes.sizeof(message.keepSSTableLevel);
             size += TypeSizes.sizeof(message.pendingRepair != null);
             if (message.pendingRepair != null)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index feeab05..7ab3d34 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -67,7 +67,7 @@ public abstract class StreamMessage
     public enum Type
     {
         PREPARE_SYN(1, 5, PrepareSynMessage.serializer),
-        FILE(2, 0, IncomingFileMessage.serializer, OutgoingFileMessage.serializer),
+        STREAM(2, 0, IncomingStreamMessage.serializer, OutgoingStreamMessage.serializer),
         RECEIVED(3, 4, ReceivedMessage.serializer),
         COMPLETE(5, 1, CompleteMessage.serializer),
         SESSION_FAILED(6, 5, SessionFailedMessage.serializer),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java
new file mode 100644
index 0000000..84cf3a3
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+/**
+ * StreamingFileHeader is appended before sending actual data to describe what it's sending.
+ */
+public class StreamMessageHeader
+{
+    public static FileMessageHeaderSerializer serializer = new FileMessageHeaderSerializer();
+
+    public final TableId tableId;
+    public UUID planId;
+    public int sessionIndex;
+    public final int sequenceNumber;
+    public final long repairedAt;
+    public final UUID pendingRepair;
+    public final InetAddressAndPort sender;
+
+    public StreamMessageHeader(TableId tableId,
+                               InetAddressAndPort sender,
+                               UUID planId,
+                               int sessionIndex,
+                               int sequenceNumber,
+                               long repairedAt,
+                               UUID pendingRepair)
+    {
+        this.tableId = tableId;
+        this.sender = sender;
+        this.planId = planId;
+        this.sessionIndex = sessionIndex;
+        this.sequenceNumber = sequenceNumber;
+        this.repairedAt = repairedAt;
+        this.pendingRepair = pendingRepair;
+    }
+
+    @Override
+    public String toString()
+    {
+        final StringBuilder sb = new StringBuilder("Header (");
+        sb.append("tableId: ").append(tableId);
+        sb.append(", #").append(sequenceNumber);
+        sb.append(", repairedAt: ").append(repairedAt);
+        sb.append(", pendingRepair: ").append(pendingRepair);
+        sb.append(')');
+        return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        StreamMessageHeader that = (StreamMessageHeader) o;
+        return sequenceNumber == that.sequenceNumber && tableId.equals(that.tableId);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = tableId.hashCode();
+        result = 31 * result + sequenceNumber;
+        return result;
+    }
+
+    public void addSessionInfo(StreamSession session)
+    {
+        planId = session.planId();
+        sessionIndex = session.sessionIndex();
+    }
+
+    static class FileMessageHeaderSerializer
+    {
+        public void serialize(StreamMessageHeader header, DataOutputPlus out, int version) throws IOException
+        {
+            header.tableId.serialize(out);
+            CompactEndpointSerializationHelper.streamingInstance.serialize(header.sender, out, version);
+            UUIDSerializer.serializer.serialize(header.planId, out, version);
+            out.writeInt(header.sessionIndex);
+            out.writeInt(header.sequenceNumber);
+            out.writeLong(header.repairedAt);
+            out.writeBoolean(header.pendingRepair != null);
+            if (header.pendingRepair != null)
+            {
+                UUIDSerializer.serializer.serialize(header.pendingRepair, out, version);
+            }
+        }
+
+        public StreamMessageHeader deserialize(DataInputPlus in, int version) throws IOException
+        {
+            TableId tableId = TableId.deserialize(in);
+            InetAddressAndPort sender = CompactEndpointSerializationHelper.streamingInstance.deserialize(in, version);
+            UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+            int sessionIndex = in.readInt();
+            int sequenceNumber = in.readInt();
+            long repairedAt = in.readLong();
+            UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null;
+
+            return new StreamMessageHeader(tableId, sender, planId, sessionIndex, sequenceNumber, repairedAt, pendingRepair);
+        }
+
+        public long serializedSize(StreamMessageHeader header, int version)
+        {
+            long size = header.tableId.serializedSize();
+            size += CompactEndpointSerializationHelper.streamingInstance.serializedSize(header.sender, version);
+            size += UUIDSerializer.serializer.serializedSize(header.planId, version);
+            size += TypeSizes.sizeof(header.sessionIndex);
+            size += TypeSizes.sizeof(header.sequenceNumber);
+            size += TypeSizes.sizeof(header.repairedAt);
+            size += TypeSizes.sizeof(header.pendingRepair != null);
+            size += header.pendingRepair != null ? UUIDSerializer.serializer.serializedSize(header.pendingRepair, version) : 0;
+
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
new file mode 100644
index 0000000..289bb0f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.util.ArrayList;
+
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.serializers.SerializationUtils;
+
+public class CassandraStreamHeaderTest
+{
+    @Test
+    public void serializerTest()
+    {
+        String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)";
+        TableMetadata metadata = CreateTableStatement.parse(ddl, "ks").build();
+        CassandraStreamHeader header = new CassandraStreamHeader(BigFormat.latestVersion,
+                                                                 SSTableFormat.Type.BIG,
+                                                                 0,
+                                                                 new ArrayList<>(),
+                                                                 ((CompressionMetadata) null),
+                                                                 0,
+                                                                 SerializationHeader.makeWithoutStats(metadata).toComponent());
+
+        SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
new file mode 100644
index 0000000..8497e71
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.OutgoingStream;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamConnectionFactory;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+
+public class CassandraStreamManagerTest
+{
+    private static final String KEYSPACE = null;
+    private String keyspace = null;
+    private static final String table = "tbl";
+    private static final StreamConnectionFactory connectionFactory = new DefaultConnectionFactory();
+
+    private TableMetadata tbm;
+    private ColumnFamilyStore cfs;
+
+    @BeforeClass
+    public static void setupClass() throws Exception
+    {
+        SchemaLoader.prepareServer();
+    }
+
+    @Before
+    public void createKeyspace() throws Exception
+    {
+        keyspace = String.format("ks_%s", System.currentTimeMillis());
+        tbm = CreateTableStatement.parse(String.format("CREATE TABLE %s (k INT PRIMARY KEY, v INT)", table), keyspace).build();
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), tbm);
+        cfs = Schema.instance.getColumnFamilyStoreInstance(tbm.id);
+    }
+
+    private static StreamSession session(UUID pendingRepair)
+    {
+        try
+        {
+            return new StreamSession(StreamOperation.REPAIR,
+                                     InetAddressAndPort.getByName("127.0.0.1"),
+                                     InetAddressAndPort.getByName("127.0.0.2"),
+                                     connectionFactory,
+                                     0,
+                                     pendingRepair,
+                                     PreviewKind.NONE);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    private SSTableReader createSSTable(Runnable queryable)
+    {
+        Set<SSTableReader> before = cfs.getLiveSSTables();
+        queryable.run();
+        cfs.forceBlockingFlush();
+        Set<SSTableReader> after = cfs.getLiveSSTables();
+
+        Set<SSTableReader> diff = Sets.difference(after, before);
+        return Iterables.getOnlyElement(diff);
+    }
+
+    private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException
+    {
+        Descriptor descriptor = sstable.descriptor;
+        descriptor.getMetadataSerializer().mutateRepaired(descriptor, repairedAt, pendingRepair);
+        sstable.reloadSSTableMetadata();
+
+    }
+
+    private static Set<SSTableReader> sstablesFromStreams(Collection<OutgoingStream> streams)
+    {
+        Set<SSTableReader> sstables = new HashSet<>();
+        for (OutgoingStream stream: streams)
+        {
+            Ref<SSTableReader> ref = CassandraOutgoingFile.fromStream(stream).getRef();
+            sstables.add(ref.get());
+            ref.release();
+        }
+        return sstables;
+    }
+
+    private Set<SSTableReader> getReadersForRange(Range<Token> range)
+    {
+        Collection<OutgoingStream> streams = cfs.getStreamManager().createOutgoingStreams(session(NO_PENDING_REPAIR),
+                                                                                          Collections.singleton(range),
+                                                                                          NO_PENDING_REPAIR,
+                                                                                          PreviewKind.NONE);
+        return sstablesFromStreams(streams);
+    }
+
+    private Set<SSTableReader> selectReaders(UUID pendingRepair)
+    {
+        IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+        Collection<Range<Token>> ranges = Lists.newArrayList(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
+        Collection<OutgoingStream> streams = cfs.getStreamManager().createOutgoingStreams(session(pendingRepair), ranges, pendingRepair, PreviewKind.NONE);
+        return sstablesFromStreams(streams);
+    }
+
+    @Test
+    public void incrementalSSTableSelection() throws Exception
+    {
+        // make 3 tables, 1 unrepaired, 2 pending repair with different repair ids, and 1 repaired
+        SSTableReader sstable1 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (1, 1)", keyspace, table)));
+        SSTableReader sstable2 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (2, 2)", keyspace, table)));
+        SSTableReader sstable3 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (3, 3)", keyspace, table)));
+        SSTableReader sstable4 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (4, 4)", keyspace, table)));
+
+
+        UUID pendingRepair = UUIDGen.getTimeUUID();
+        long repairedAt = System.currentTimeMillis();
+        mutateRepaired(sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair);
+        mutateRepaired(sstable3, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
+        mutateRepaired(sstable4, repairedAt, NO_PENDING_REPAIR);
+
+
+
+        // no pending repair should return all sstables
+        Assert.assertEquals(Sets.newHashSet(sstable1, sstable2, sstable3, sstable4), selectReaders(NO_PENDING_REPAIR));
+
+        // a pending repair arg should only return sstables with the same pending repair id
+        Assert.assertEquals(Sets.newHashSet(sstable2), selectReaders(pendingRepair));
+    }
+
+    @Test
+    public void testSSTableSectionsForRanges() throws Exception
+    {
+        cfs.truncateBlocking();
+
+        createSSTable(() -> {
+            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (1, 1)", keyspace, table));
+            QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (2, 2)", keyspace, table));
+        });
+
+        Collection<SSTableReader> allSSTables = cfs.getLiveSSTables();
+        Assert.assertEquals(1, allSSTables.size());
+        final Token firstToken = allSSTables.iterator().next().first.getToken();
+        DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1);
+
+        Set<SSTableReader> sstablesBeforeRewrite = getReadersForRange(new Range<>(firstToken, firstToken));
+        Assert.assertEquals(1, sstablesBeforeRewrite.size());
+        final AtomicInteger checkCount = new AtomicInteger();
+        // needed since we get notified when compaction is done as well - we can't get sections for ranges for obsoleted sstables
+        final AtomicBoolean done = new AtomicBoolean(false);
+        final AtomicBoolean failed = new AtomicBoolean(false);
+        Runnable r = new Runnable()
+        {
+            public void run()
+            {
+                while (!done.get())
+                {
+                    Range<Token> range = new Range<Token>(firstToken, firstToken);
+                    Set<SSTableReader> sstables = getReadersForRange(range);
+                    if (sstables.size() != 1)
+                        failed.set(true);
+                    checkCount.incrementAndGet();
+                    Uninterruptibles.sleepUninterruptibly(5, TimeUnit.MILLISECONDS);
+                }
+            }
+        };
+        Thread t = NamedThreadFactory.createThread(r);
+        try
+        {
+            t.start();
+            cfs.forceMajorCompaction();
+            // reset
+        }
+        finally
+        {
+            DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(50);
+            done.set(true);
+            t.join(20);
+        }
+        Assert.assertFalse(failed.get());
+        Assert.assertTrue(checkCount.get() >= 2);
+        cfs.truncateBlocking();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
index ad5f8f5..8f0a407 100644
--- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
+++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.streaming.DefaultConnectionFactory;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -51,7 +52,7 @@ public class StreamStateStoreTest
         Range<Token> range = new Range<>(factory.fromString("0"), factory.fromString("100"));
 
         InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
-        StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null, PreviewKind.NONE);
+        StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, local, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
         session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf"));
 
         StreamStateStore store = new StreamStateStore();
@@ -72,7 +73,7 @@ public class StreamStateStoreTest
 
         // add different range within the same keyspace
         Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200"));
-        session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null, PreviewKind.NONE);
+        session = new StreamSession(StreamOperation.BOOTSTRAP, local, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
         session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf"));
         session.state(StreamSession.State.COMPLETE);
         store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index e399c67..8dd8197 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 
+import com.google.common.collect.Lists;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -42,6 +43,7 @@ import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.Verifier;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -52,6 +54,7 @@ import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.OutgoingStream;
 import org.apache.cassandra.streaming.StreamPlan;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamOperation;
@@ -248,12 +251,11 @@ public class LegacySSTableTest
         List<Range<Token>> ranges = new ArrayList<>();
         ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100"))));
         ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("100")), p.getMinimumToken()));
-        ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
-        details.add(new StreamSession.SSTableStreamingSections(sstable.ref(),
-                                                               sstable.getPositionsForRanges(ranges),
-                                                               sstable.estimatedKeysForRanges(ranges)));
-        new StreamPlan(StreamOperation.OTHER).transferFiles(FBUtilities.getBroadcastAddressAndPort(), details)
-                                  .execute().get();
+        List<OutgoingStream> streams = Lists.newArrayList(new CassandraOutgoingFile(StreamOperation.OTHER,
+                                                                                    sstable.ref(),
+                                                                                    sstable.getPositionsForRanges(ranges),
+                                                                                    sstable.estimatedKeysForRanges(ranges)));
+        new StreamPlan(StreamOperation.OTHER).transferStreams(FBUtilities.getBroadcastAddressAndPort(), streams).execute().get();
     }
 
     private static void truncateLegacyTables(String legacyVersion) throws Exception

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 53f5ab3..5c3e8c9 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -774,65 +774,6 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
         validateCFS(cfs);
     }
 
-    @Test
-    public void testSSTableSectionsForRanges() throws IOException, InterruptedException, ExecutionException
-    {
-        Keyspace keyspace = Keyspace.open(KEYSPACE);
-        final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        truncate(cfs);
-
-        cfs.addSSTable(writeFile(cfs, 1000));
-
-        Collection<SSTableReader> allSSTables = cfs.getLiveSSTables();
-        assertEquals(1, allSSTables.size());
-        final Token firstToken = allSSTables.iterator().next().first.getToken();
-        DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1);
-
-        List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = StreamSession.getSSTableSectionsForRanges(
-            Collections.singleton(new Range<Token>(firstToken, firstToken)),
-            Collections.singleton(cfs), null, PreviewKind.NONE);
-        assertEquals(1, sectionsBeforeRewrite.size());
-        for (StreamSession.SSTableStreamingSections section : sectionsBeforeRewrite)
-            section.ref.release();
-        final AtomicInteger checkCount = new AtomicInteger();
-        // needed since we get notified when compaction is done as well - we can't get sections for ranges for obsoleted sstables
-        final AtomicBoolean done = new AtomicBoolean(false);
-        final AtomicBoolean failed = new AtomicBoolean(false);
-        Runnable r = new Runnable()
-        {
-            public void run()
-            {
-                while (!done.get())
-                {
-                    Set<Range<Token>> range = Collections.singleton(new Range<Token>(firstToken, firstToken));
-                    List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), null, PreviewKind.NONE);
-                    if (sections.size() != 1)
-                        failed.set(true);
-                    for (StreamSession.SSTableStreamingSections section : sections)
-                        section.ref.release();
-                    checkCount.incrementAndGet();
-                    Uninterruptibles.sleepUninterruptibly(5, TimeUnit.MILLISECONDS);
-                }
-            }
-        };
-        Thread t = NamedThreadFactory.createThread(r);
-        try
-        {
-            t.start();
-            cfs.forceMajorCompaction();
-            // reset
-        }
-        finally
-        {
-            DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(50);
-            done.set(true);
-            t.join(20);
-        }
-        assertFalse(failed.get());
-        assertTrue(checkCount.get() >= 2);
-        truncate(cfs);
-    }
-
     /**
      * emulates anticompaction - writing from one source sstable to two new sstables
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/serializers/SerializationUtils.java b/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
new file mode 100644
index 0000000..7ce4ec5
--- /dev/null
+++ b/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.serializers;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+
+public class SerializationUtils
+{
+
+    public static <T> T cycleSerialization(T src, IVersionedSerializer<T> serializer, int version)
+    {
+        int expectedSize = (int) serializer.serializedSize(src, version);
+
+        try (DataOutputBuffer out = new DataOutputBuffer(expectedSize))
+        {
+            serializer.serialize(src, out, version);
+            Assert.assertEquals(expectedSize, out.buffer().limit());
+            try (DataInputBuffer in = new DataInputBuffer(out.buffer(), false))
+            {
+                return serializer.deserialize(in, version);
+            }
+        }
+        catch (IOException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    public static <T> T cycleSerialization(T src, IVersionedSerializer<T> serializer)
+    {
+        return cycleSerialization(src, serializer, MessagingService.current_version);
+    }
+
+    public static <T> void assertSerializationCycle(T src, IVersionedSerializer<T> serializer, int version)
+    {
+        T dst = cycleSerialization(src, serializer, version);
+        Assert.assertEquals(src, dst);
+    }
+
+    public static <T> void assertSerializationCycle(T src, IVersionedSerializer<T> serializer)
+    {
+        assertSerializationCycle(src, serializer, MessagingService.current_version);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
deleted file mode 100644
index 6abc2a2..0000000
--- a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.streaming;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.statements.CreateTableStatement;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.UUIDGen;
-
-public class StreamSessionTest
-{
-    private String keyspace = null;
-    private static final String table = "tbl";
-
-    private TableMetadata tbm;
-    private ColumnFamilyStore cfs;
-
-    @BeforeClass
-    public static void setupClass() throws Exception
-    {
-        SchemaLoader.prepareServer();
-    }
-
-    @Before
-    public void createKeyspace() throws Exception
-    {
-        keyspace = String.format("ks_%s", System.currentTimeMillis());
-        tbm = CreateTableStatement.parse(String.format("CREATE TABLE %s (k INT PRIMARY KEY, v INT)", table), keyspace).build();
-        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), tbm);
-        cfs = Schema.instance.getColumnFamilyStoreInstance(tbm.id);
-    }
-
-    private SSTableReader createSSTable(Runnable queryable)
-    {
-        Set<SSTableReader> before = cfs.getLiveSSTables();
-        queryable.run();
-        cfs.forceBlockingFlush();
-        Set<SSTableReader> after = cfs.getLiveSSTables();
-
-        Set<SSTableReader> diff = Sets.difference(after, before);
-        assert diff.size() == 1 : "Expected 1 new sstable, got " + diff.size();
-        return diff.iterator().next();
-    }
-
-    private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException
-    {
-        Descriptor descriptor = sstable.descriptor;
-        descriptor.getMetadataSerializer().mutateRepaired(descriptor, repairedAt, pendingRepair);
-        sstable.reloadSSTableMetadata();
-
-    }
-
-    private Set<SSTableReader> selectReaders(UUID pendingRepair)
-    {
-        IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
-        Collection<Range<Token>> ranges = Lists.newArrayList(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
-        List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(ranges,
-                                                                                                          Lists.newArrayList(cfs),
-                                                                                                          pendingRepair,
-                                                                                                          PreviewKind.NONE);
-        Set<SSTableReader> sstables = new HashSet<>();
-        for (StreamSession.SSTableStreamingSections section: sections)
-        {
-            sstables.add(section.ref.get());
-        }
-        return sstables;
-    }
-
-    @Test
-    public void incrementalSSTableSelection() throws Exception
-    {
-        // make 3 tables, 1 unrepaired, 2 pending repair with different repair ids, and 1 repaired
-        SSTableReader sstable1 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (1, 1)", keyspace, table)));
-        SSTableReader sstable2 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (2, 2)", keyspace, table)));
-        SSTableReader sstable3 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (3, 3)", keyspace, table)));
-        SSTableReader sstable4 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (4, 4)", keyspace, table)));
-
-
-        UUID pendingRepair = UUIDGen.getTimeUUID();
-        long repairedAt = System.currentTimeMillis();
-        mutateRepaired(sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair);
-        mutateRepaired(sstable3, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
-        mutateRepaired(sstable4, repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
-
-        // no pending repair should return all sstables
-        Assert.assertEquals(Sets.newHashSet(sstable1, sstable2, sstable3, sstable4), selectReaders(ActiveRepairService.NO_PENDING_REPAIR));
-
-        // a pending repair arg should only return sstables with the same pending repair id
-        Assert.assertEquals(Sets.newHashSet(sstable2), selectReaders(pendingRepair));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index ceaaae0..45c917a 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -36,6 +36,7 @@ import junit.framework.Assert;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -43,7 +44,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
+import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Ref;
 
@@ -75,7 +76,7 @@ public class StreamTransferTaskTest
     public void testScheduleTimeout() throws Exception
     {
         InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
-        StreamSession session = new StreamSession(peer, peer, (connectionId, protocolVersion) -> new EmbeddedChannel(), 0, true, UUID.randomUUID(), PreviewKind.ALL);
+        StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, peer, peer, (connectionId, protocolVersion) -> new EmbeddedChannel(), 0, UUID.randomUUID(), PreviewKind.ALL);
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 
         // create two sstables
@@ -91,7 +92,7 @@ public class StreamTransferTaskTest
         {
             List<Range<Token>> ranges = new ArrayList<>();
             ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
-            task.addTransferFile(sstable.selfRef(), 1, sstable.getPositionsForRanges(ranges));
+            task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.selfRef(), sstable.getPositionsForRanges(ranges), 1));
         }
         assertEquals(2, task.getTotalNumberOfFiles());
 
@@ -121,9 +122,9 @@ public class StreamTransferTaskTest
     public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
     {
         InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
-        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, new DefaultConnectionFactory(), false, null, PreviewKind.NONE);
+        StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, null, PreviewKind.NONE);
         StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), streamCoordinator);
-        StreamSession session = new StreamSession(peer, peer, null, 0, true, null, PreviewKind.NONE);
+        StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, peer, peer, null, 0, null, PreviewKind.NONE);
         session.init(future);
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 
@@ -143,7 +144,7 @@ public class StreamTransferTaskTest
             ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
             Ref<SSTableReader> ref = sstable.selfRef();
             refs.add(ref);
-            task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges));
+            task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, ref, sstable.getPositionsForRanges(ranges), 1));
         }
         assertEquals(2, task.getTotalNumberOfFiles());
 
@@ -151,10 +152,10 @@ public class StreamTransferTaskTest
         session.transfers.put(TableId.generate(), task);
 
         //make a copy of outgoing file messages, since task is cleared when it's aborted
-        Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+        Collection<OutgoingStreamMessage> files = new LinkedList<>(task.streams.values());
 
         //simulate start transfer
-        for (OutgoingFileMessage file : files)
+        for (OutgoingStreamMessage file : files)
         {
             file.startTransfer();
         }
@@ -169,7 +170,7 @@ public class StreamTransferTaskTest
         }
 
         //simulate finish transfer
-        for (OutgoingFileMessage file : files)
+        for (OutgoingStreamMessage file : files)
         {
             file.finishTransfer();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 16c07a0..575200a 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -34,6 +34,7 @@ import junit.framework.Assert;
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -254,13 +255,13 @@ public class StreamingTransferTest
 
     private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception
     {
-        StreamPlan streamPlan = new StreamPlan(StreamOperation.OTHER).transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
+        StreamPlan streamPlan = new StreamPlan(StreamOperation.OTHER).transferStreams(LOCAL, makeOutgoingStreams(ranges, Refs.tryRef(Arrays.asList(sstable))));
         streamPlan.execute().get();
 
         //cannot add files after stream session is finished
         try
         {
-            streamPlan.transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
+            streamPlan.transferStreams(LOCAL, makeOutgoingStreams(ranges, Refs.tryRef(Arrays.asList(sstable))));
             fail("Should have thrown exception");
         }
         catch (RuntimeException e)
@@ -269,16 +270,22 @@ public class StreamingTransferTest
         }
     }
 
-    private Collection<StreamSession.SSTableStreamingSections> makeStreamingDetails(List<Range<Token>> ranges, Refs<SSTableReader> sstables)
+    private Collection<OutgoingStream> makeOutgoingStreams(StreamOperation operation, List<Range<Token>> ranges, Refs<SSTableReader> sstables)
     {
-        ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
+        ArrayList<OutgoingStream> streams = new ArrayList<>();
         for (SSTableReader sstable : sstables)
         {
-            details.add(new StreamSession.SSTableStreamingSections(sstables.get(sstable),
-                                                                   sstable.getPositionsForRanges(ranges),
-                                                                   sstable.estimatedKeysForRanges(ranges)));
+            streams.add(new CassandraOutgoingFile(operation,
+                                                  sstables.get(sstable),
+                                                  sstable.getPositionsForRanges(ranges),
+                                                  sstable.estimatedKeysForRanges(ranges)));
         }
-        return details;
+        return streams;
+    }
+
+    private Collection<OutgoingStream> makeOutgoingStreams(List<Range<Token>> ranges, Refs<SSTableReader> sstables)
+    {
+        return makeOutgoingStreams(StreamOperation.OTHER, ranges, sstables);
     }
 
     private void doTransferTable(boolean transferSSTables) throws Exception
@@ -458,7 +465,7 @@ public class StreamingTransferTest
         // Acquiring references, transferSSTables needs it
         Refs<SSTableReader> refs = Refs.tryRef(Arrays.asList(sstable, sstable2));
         assert refs != null;
-        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, refs)).execute().get();
+        new StreamPlan("StreamingTransferTest").transferStreams(LOCAL, makeOutgoingStreams(ranges, refs)).execute().get();
 
         // confirm that the sstables were transferred and registered and that 2 keys arrived
         ColumnFamilyStore cfstore = Keyspace.open(keyspaceName).getColumnFamilyStore(cfname);
@@ -513,7 +520,7 @@ public class StreamingTransferTest
         if (refs == null)
             throw new AssertionError();
 
-        new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, refs)).execute().get();
+        new StreamPlan("StreamingTransferTest").transferStreams(LOCAL, makeOutgoingStreams(ranges, refs)).execute().get();
 
         // check that only two keys were transferred
         for (Map.Entry<DecoratedKey,String> entry : Arrays.asList(first, last))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
index 617bae1..fd22a65 100644
--- a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
+++ b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.streaming.async;
 
-import java.net.InetSocketAddress;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -64,8 +63,8 @@ public class NettyStreamingMessageSenderTest
         channel = new EmbeddedChannel();
         channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
         UUID pendingRepair = UUID.randomUUID();
-        session = new StreamSession(REMOTE_ADDR, REMOTE_ADDR, (connectionId, protocolVersion) -> null, 0, true, pendingRepair, PreviewKind.ALL);
-        StreamResultFuture future = StreamResultFuture.initReceivingSide(0, UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, true, pendingRepair, session.getPreviewKind());
+        session = new StreamSession(StreamOperation.BOOTSTRAP, REMOTE_ADDR, REMOTE_ADDR, (connectionId, protocolVersion) -> null, 0, pendingRepair, PreviewKind.ALL);
+        StreamResultFuture future = StreamResultFuture.initReceivingSide(0, UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, pendingRepair, session.getPreviewKind());
         session.init(future);
         sender = session.getMessageSender();
         sender.setControlMessageChannel(channel);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org