You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/20 19:07:14 UTC

[2/5] Streaming 2.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingService.java b/src/java/org/apache/cassandra/streaming/StreamingService.java
deleted file mode 100644
index ea22f82..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamingService.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-
-public class StreamingService implements StreamingServiceMBean
-{
-    public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.net:type=StreamingService";
-    public static final StreamingService instance = new StreamingService();
-
-    private StreamingService()
-    {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(MBEAN_OBJECT_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public String getStatus()
-    {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Receiving from:\n");
-        for (InetAddress source : StreamInSession.getSources())
-        {
-            sb.append(String.format(" %s:%n", source.getHostAddress()));
-            for (PendingFile pf : StreamInSession.getIncomingFiles(source))
-            {
-                sb.append(String.format("  %s%n", pf.toString()));
-            }
-        }
-        sb.append("Sending to:%n");
-        for (InetAddress dest : StreamOutSession.getDestinations())
-        {
-            sb.append(String.format(" %s:%n", dest.getHostAddress()));
-            for (PendingFile pf : StreamOutSession.getOutgoingFiles(dest))
-            {
-                sb.append(String.format("  %s%n", pf.toString()));
-            }
-        }
-        return sb.toString();
-    }
-
-    /** hosts receiving outgoing streams. */
-    public Set<InetAddress> getStreamDestinations()
-    {
-        return StreamOutSession.getDestinations();
-    }
-
-    /** outgoing streams */
-    public List<String> getOutgoingFiles(String host) throws IOException
-    {
-        List<String> files = new ArrayList<String>();
-        // first, verify that host is a destination. calling StreamOutManager.get will put it in the collection
-        // leading to false positives in the future.
-        Set<InetAddress> existingDestinations = getStreamDestinations();
-        InetAddress dest = InetAddress.getByName(host);
-        if (!existingDestinations.contains(dest))
-            return files;
-
-        for (PendingFile f : StreamOutSession.getOutgoingFiles(dest))
-            files.add(String.format("%s", f.toString()));
-        return files;
-    }
-
-    /** hosts sending incoming streams */
-    public Set<InetAddress> getStreamSources()
-    {
-        return StreamInSession.getSources();
-    }
-
-    /** details about incoming streams. */
-    public List<String> getIncomingFiles(String host) throws IOException
-    {
-        List<String> files = new ArrayList<String>();
-        for (PendingFile pf : StreamInSession.getIncomingFiles(InetAddress.getByName(host)))
-        {
-            files.add(String.format("%s: %s", pf.desc.ksname, pf.toString()));
-        }
-        return files;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java b/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
deleted file mode 100644
index 8d748f3..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.Set;
-
-public interface StreamingServiceMBean
-{
-    /** hosts recieving outgoing streams */
-    public Set<InetAddress> getStreamDestinations();
-
-    /** outgoing streams */
-    public List<String> getOutgoingFiles(String host) throws IOException;
-
-    /** hosts sending incoming streams. */
-    public Set<InetAddress> getStreamSources();
-
-    /** details about incoming streams */
-    public List<String> getIncomingFiles(String host) throws IOException;
-
-    /** What's currently happening wrt streaming. */
-    public String getStatus();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java b/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
deleted file mode 100644
index 07fb765..0000000
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.compress;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.metrics.StreamingMetrics;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.FileStreamTask;
-import org.apache.cassandra.streaming.StreamHeader;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * FileStreamTask for compressed SSTable.
- *
- * This class sends relevant part of compressed file directly using nio if available.
- */
-public class CompressedFileStreamTask extends FileStreamTask
-{
-    private static final Logger logger = LoggerFactory.getLogger(CompressedFileStreamTask.class);
-    // 10MB chunks
-    public static final int CHUNK_SIZE = 10*1024*1024;
-
-    public CompressedFileStreamTask(StreamHeader header, InetAddress to)
-    {
-        super(header, to);
-    }
-
-    protected void stream() throws IOException
-    {
-        assert header.file.compressionInfo != null;
-
-        SocketChannel sc = socket.getChannel();
-        byte[] transferBuffer = null;
-
-        // write header
-        ByteBuffer headerBuffer = MessagingService.instance().constructStreamHeader(header, false, MessagingService.instance().getVersion(to));
-        socket.getOutputStream().write(ByteBufferUtil.getArray(headerBuffer));
-
-        RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename()));
-        FileChannel fc = file.getChannel();
-
-        StreamingMetrics.activeStreamsOutbound.inc();
-        // calculate chunks to transfer. we want to send continuous chunks altogether.
-        List<Pair<Long, Long>> sections = getTransferSections(header.file.compressionInfo.chunks);
-        try
-        {
-            long totalBytesTransferred = 0;
-            // stream each of the required sections of the file
-            for (Pair<Long, Long> section : sections)
-            {
-                // seek to the beginning of the section when socket channel is not available
-                if (sc == null)
-                    file.seek(section.left);
-                // length of the section to stream
-                long length = section.right - section.left;
-                // tracks write progress
-                long bytesTransferred = 0;
-                while (bytesTransferred < length)
-                {
-                    int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
-                    long lastWrite;
-                    if (sc != null)
-                    {
-                        lastWrite = fc.transferTo(section.left + bytesTransferred, toTransfer, sc);
-                        throttle.throttleDelta(lastWrite);
-                    }
-                    else
-                    {
-                        // NIO is not available. Fall back to normal streaming.
-                        // This happens when inter-node encryption is turned on.
-                        if (transferBuffer == null)
-                            transferBuffer = new byte[CHUNK_SIZE];
-                        file.readFully(transferBuffer, 0, toTransfer);
-                        socket.getOutputStream().write(transferBuffer, 0, toTransfer);
-                        throttle.throttleDelta(toTransfer);
-                        lastWrite = toTransfer;
-                    }
-                    totalBytesTransferred += lastWrite;
-                    bytesTransferred += lastWrite;
-                    header.file.progress += lastWrite;
-                }
-
-                if (sc == null)
-                    socket.getOutputStream().flush();
-
-                logger.debug("Bytes transferred " + bytesTransferred + "/" + header.file.size);
-            }
-            StreamingMetrics.totalOutgoingBytes.inc(totalBytesTransferred);
-            metrics.outgoingBytes.inc(totalBytesTransferred);
-            // receive reply confirmation
-            receiveReply();
-        }
-        finally
-        {
-            StreamingMetrics.activeStreamsOutbound.dec();
-
-            // no matter what happens close file
-            FileUtils.closeQuietly(file);
-        }
-    }
-
-    // chunks are assumed to be sorted by offset
-    private List<Pair<Long, Long>> getTransferSections(CompressionMetadata.Chunk[] chunks)
-    {
-        List<Pair<Long, Long>> transferSections = new ArrayList<Pair<Long, Long>>();
-        Pair<Long, Long> lastSection = null;
-        for (CompressionMetadata.Chunk chunk : chunks)
-        {
-            if (lastSection != null)
-            {
-                if (chunk.offset == lastSection.right)
-                {
-                    // extend previous section to end of this chunk
-                    lastSection = Pair.create(lastSection.left, chunk.offset + chunk.length + 4); // 4 bytes for CRC
-                }
-                else
-                {
-                    transferSections.add(lastSection);
-                    lastSection = Pair.create(chunk.offset, chunk.offset + chunk.length + 4);
-                }
-            }
-            else
-            {
-                lastSection = Pair.create(chunk.offset, chunk.offset + chunk.length + 4);
-            }
-        }
-        if (lastSection != null)
-            transferSections.add(lastSection);
-        return transferSections;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 6fe1793..3305f50 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -17,9 +17,12 @@
  */
 package org.apache.cassandra.streaming.compress;
 
-import java.io.*;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Iterator;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
new file mode 100644
index 0000000..f3e511b
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.compress;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamReader;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.utils.BytesReadTracker;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * StreamReader that reads from streamed compressed SSTable
+ */
+public class CompressedStreamReader extends StreamReader
+{
+    protected final CompressionInfo compressionInfo;
+
+    public CompressedStreamReader(FileMessageHeader header, StreamSession session)
+    {
+        super(header, session);
+        this.compressionInfo = header.compressionInfo;
+    }
+
+    /**
+     * @return SSTable transferred
+     * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
+     */
+    @Override
+    public SSTableReader read(ReadableByteChannel channel) throws IOException
+    {
+        long totalSize = totalSize();
+        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
+
+        Pair<String, String> kscf = Schema.instance.getCF(cfId);
+        ColumnFamilyStore cfs = Table.open(kscf.left).getColumnFamilyStore(kscf.right);
+        Directories.DataDirectory localDir = cfs.directories.getLocationCapableOfSize(totalSize);
+        if (localDir == null)
+            throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
+        desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
+
+        SSTableWriter writer = new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys);
+        try
+        {
+            BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+
+            for (Pair<Long, Long> section : sections)
+            {
+                long length = section.right - section.left;
+                // skip to beginning of section inside chunk
+                cis.position(section.left);
+                in.reset(0);
+                while (in.getBytesRead() < length)
+                {
+                    writeRow(writer, in, cfs);
+                    // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
+                    session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
+                }
+            }
+            return writer.closeAndOpenReader();
+        }
+        catch (Throwable e)
+        {
+            writer.abort();
+            if (e instanceof IOException)
+                throw (IOException) e;
+            else
+                throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    protected long totalSize()
+    {
+        long size = 0;
+        // calculate total length of transferring chunks
+        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
+            size += chunk.length + 4; // 4 bytes for CRC
+        return size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
new file mode 100644
index 0000000..80fcef5
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.compress;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamWriter;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * StreamWriter for compressed SSTable.
+ */
+public class CompressedStreamWriter extends StreamWriter
+{
+    public static final int CHUNK_SIZE = 10 * 1024 * 1024;
+
+    private final CompressionInfo compressionInfo;
+
+    public CompressedStreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, CompressionInfo compressionInfo, StreamSession session)
+    {
+        super(sstable, sections, session);
+        this.compressionInfo = compressionInfo;
+    }
+
+    @Override
+    public void write(WritableByteChannel channel) throws IOException
+    {
+        long totalSize = totalSize();
+        RandomAccessReader file = sstable.openDataReader();
+        FileChannel fc = file.getChannel();
+
+        long progress = 0L;
+        // calculate chunks to transfer. we want to send continuous chunks altogether.
+        List<Pair<Long, Long>> sections = getTransferSections(compressionInfo.chunks);
+        try
+        {
+            // stream each of the required sections of the file
+            for (Pair<Long, Long> section : sections)
+            {
+                // length of the section to stream
+                long length = section.right - section.left;
+                // tracks write progress
+                long bytesTransferred = 0;
+                while (bytesTransferred < length)
+                {
+                    int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
+                    limiter.acquire(toTransfer);
+                    long lastWrite = fc.transferTo(section.left + bytesTransferred, toTransfer, channel);
+                    bytesTransferred += lastWrite;
+                    progress += lastWrite;
+                    session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
+                }
+            }
+        }
+        finally
+        {
+            // no matter what happens close file
+            FileUtils.closeQuietly(file);
+        }
+
+        sstable.releaseReference();
+    }
+
+    @Override
+    protected long totalSize()
+    {
+        long size = 0;
+        // calculate total length of transferring chunks
+        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
+            size += chunk.length + 4; // 4 bytes for CRC
+        return size;
+    }
+
+    // chunks are assumed to be sorted by offset
+    private List<Pair<Long, Long>> getTransferSections(CompressionMetadata.Chunk[] chunks)
+    {
+        List<Pair<Long, Long>> transferSections = new ArrayList<>();
+        Pair<Long, Long> lastSection = null;
+        for (CompressionMetadata.Chunk chunk : chunks)
+        {
+            if (lastSection != null)
+            {
+                if (chunk.offset == lastSection.right)
+                {
+                    // extend previous section to end of this chunk
+                    lastSection = Pair.create(lastSection.left, chunk.offset + chunk.length + 4); // 4 bytes for CRC
+                }
+                else
+                {
+                    transferSections.add(lastSection);
+                    lastSection = Pair.create(chunk.offset, chunk.offset + chunk.length + 4);
+                }
+            }
+            else
+            {
+                lastSection = Pair.create(chunk.offset, chunk.offset + chunk.length + 4);
+            }
+        }
+        if (lastSection != null)
+            transferSections.add(lastSection);
+        return transferSections;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
new file mode 100644
index 0000000..b561abc
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.cassandra.streaming.StreamSession;
+
+public class CompleteMessage extends StreamMessage
+{
+    public static Serializer<CompleteMessage> serializer = new Serializer<CompleteMessage>()
+    {
+        public CompleteMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+        {
+            return new CompleteMessage();
+        }
+
+        public void serialize(CompleteMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException {}
+    };
+
+    public CompleteMessage()
+    {
+        super(Type.COMPLETE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/FileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessage.java b/src/java/org/apache/cassandra/streaming/messages/FileMessage.java
new file mode 100644
index 0000000..fe05eac
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessage.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.streaming.StreamReader;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamWriter;
+import org.apache.cassandra.streaming.compress.CompressedStreamReader;
+import org.apache.cassandra.streaming.compress.CompressedStreamWriter;
+import org.apache.cassandra.streaming.compress.CompressionInfo;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * FileMessage is used to transfer/receive the part(or whole) of a SSTable data file.
+ */
+public class FileMessage extends StreamMessage
+{
+    public static Serializer<FileMessage> serializer = new Serializer<FileMessage>()
+    {
+        public FileMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+        {
+            DataInputStream input = new DataInputStream(Channels.newInputStream(in));
+            FileMessageHeader header = FileMessageHeader.serializer.deserialize(input, version);
+            StreamReader reader = header.compressionInfo == null ? new StreamReader(header, session)
+                                          : new CompressedStreamReader(header, session);
+
+            try
+            {
+                return new FileMessage(reader.read(in), header);
+            }
+            catch (Throwable e)
+            {
+                session.doRetry(header, e);
+                return null;
+            }
+        }
+
+        public void serialize(FileMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException
+        {
+            DataOutput output = new DataOutputStream(Channels.newOutputStream(out));
+            FileMessageHeader.serializer.serialize(message.header, output, version);
+            StreamWriter writer = message.header.compressionInfo == null ?
+                                          new StreamWriter(message.sstable, message.header.sections, session) :
+                                          new CompressedStreamWriter(message.sstable,
+                                                                     message.header.sections,
+                                                                     message.header.compressionInfo, session);
+            writer.write(out);
+            session.fileSent(message.header);
+        }
+    };
+
+    public final FileMessageHeader header;
+    public final SSTableReader sstable;
+
+    public FileMessage(SSTableReader sstable, FileMessageHeader header)
+    {
+        super(Type.FILE);
+        this.header = header;
+        this.sstable = sstable;
+    }
+
+    public FileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections)
+    {
+        super(Type.FILE);
+        this.sstable = sstable;
+
+        CompressionInfo compressionInfo = null;
+        if (sstable.compression)
+        {
+            CompressionMetadata meta = sstable.getCompressionMetadata();
+            compressionInfo = new CompressionInfo(meta.getChunksForSections(sections), meta.parameters);
+        }
+        this.header = new FileMessageHeader(sstable.metadata.cfId,
+                                            sequenceNumber,
+                                            sstable.descriptor.version.toString(),
+                                            estimatedKeys,
+                                            sections,
+                                            compressionInfo);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
new file mode 100644
index 0000000..761e086
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.compress.CompressionInfo;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+/**
+ * StreamingFileHeader is appended before sending actual data to describe what it's sending.
+ */
+public class FileMessageHeader
+{
+    public static IVersionedSerializer<FileMessageHeader> serializer = new FileMessageHeaderSerializer();
+
+    public final UUID cfId;
+    public final int sequenceNumber;
+    /** SSTable version */
+    public final String version;
+    public final long estimatedKeys;
+    public final List<Pair<Long, Long>> sections;
+    public final CompressionInfo compressionInfo;
+
+    public FileMessageHeader(UUID cfId,
+                             int sequenceNumber,
+                             String version,
+                             long estimatedKeys,
+                             List<Pair<Long, Long>> sections,
+                             CompressionInfo compressionInfo)
+    {
+        this.cfId = cfId;
+        this.sequenceNumber = sequenceNumber;
+        this.version = version;
+        this.estimatedKeys = estimatedKeys;
+        this.sections = sections;
+        this.compressionInfo = compressionInfo;
+    }
+
+    /**
+     * @return total file size to transfer in bytes
+     */
+    public long size()
+    {
+        long size = 0;
+        if (compressionInfo != null)
+        {
+            // calculate total length of transferring chunks
+            for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
+                size += chunk.length + 4; // 4 bytes for CRC
+        }
+        else
+        {
+            for (Pair<Long, Long> section : sections)
+                size += section.right - section.left;
+        }
+        return size;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        FileMessageHeader that = (FileMessageHeader) o;
+        return sequenceNumber == that.sequenceNumber && cfId.equals(that.cfId);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = cfId.hashCode();
+        result = 31 * result + sequenceNumber;
+        return result;
+    }
+
+    static class FileMessageHeaderSerializer implements IVersionedSerializer<FileMessageHeader>
+    {
+        public void serialize(FileMessageHeader header, DataOutput out, int version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(header.cfId, out, version);
+            out.writeInt(header.sequenceNumber);
+            out.writeUTF(header.version);
+            out.writeLong(header.estimatedKeys);
+
+            out.writeInt(header.sections.size());
+            for (Pair<Long, Long> section : header.sections)
+            {
+                out.writeLong(section.left);
+                out.writeLong(section.right);
+            }
+            CompressionInfo.serializer.serialize(header.compressionInfo, out, version);
+        }
+
+        public FileMessageHeader deserialize(DataInput in, int version) throws IOException
+        {
+            UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+            int sequenceNumber = in.readInt();
+            String sstableVersion = in.readUTF();
+            long estimatedKeys = in.readLong();
+            int count = in.readInt();
+            List<Pair<Long, Long>> sections = new ArrayList<>(count);
+            for (int k = 0; k < count; k++)
+                sections.add(Pair.create(in.readLong(), in.readLong()));
+            CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, MessagingService.current_version);
+            return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, estimatedKeys, sections, compressionInfo);
+        }
+
+        public long serializedSize(FileMessageHeader header, int version)
+        {
+            long size = UUIDSerializer.serializer.serializedSize(header.cfId, version);
+            size += TypeSizes.NATIVE.sizeof(header.sequenceNumber);
+            size += TypeSizes.NATIVE.sizeof(header.version);
+            size += TypeSizes.NATIVE.sizeof(header.estimatedKeys);
+
+            size += TypeSizes.NATIVE.sizeof(header.sections.size());
+            for (Pair<Long, Long> section : header.sections)
+            {
+                size += TypeSizes.NATIVE.sizeof(section.left);
+                size += TypeSizes.NATIVE.sizeof(section.right);
+            }
+            size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
+            return size;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
new file mode 100644
index 0000000..ba94592
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.*;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.cassandra.streaming.StreamRequest;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+
+public class PrepareMessage extends StreamMessage
+{
+    public static Serializer<PrepareMessage> serializer = new Serializer<PrepareMessage>()
+    {
+        public PrepareMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+        {
+            DataInput input = new DataInputStream(Channels.newInputStream(in));
+            PrepareMessage message = new PrepareMessage();
+            // requests
+            int numRequests = input.readInt();
+            for (int i = 0; i < numRequests; i++)
+                message.requests.add(StreamRequest.serializer.deserialize(input, version));
+            // summaries
+            int numSummaries = input.readInt();
+            for (int i = 0; i < numSummaries; i++)
+                message.summaries.add(StreamSummary.serializer.deserialize(input, version));
+            return message;
+        }
+
+        public void serialize(PrepareMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException
+        {
+            DataOutput output = new DataOutputStream(Channels.newOutputStream(out));
+            // requests
+            output.writeInt(message.requests.size());
+            for (StreamRequest request : message.requests)
+                StreamRequest.serializer.serialize(request, output, version);
+            // summaries
+            output.writeInt(message.summaries.size());
+            for (StreamSummary summary : message.summaries)
+                StreamSummary.serializer.serialize(summary, output, version);
+        }
+    };
+
+    /**
+     * Streaming requests
+     */
+    public final Collection<StreamRequest> requests = new ArrayList<>();
+
+    /**
+     * Summaries of streaming out
+     */
+    public final Collection<StreamSummary> summaries = new ArrayList<>();
+
+    public PrepareMessage()
+    {
+        super(Type.PREPARE);
+    }
+
+    @Override
+    public String toString()
+    {
+        final StringBuilder sb = new StringBuilder("PrepareMessage{");
+        sb.append(requests.size()).append(" requests, ");
+        int totalFile = 0;
+        for (StreamSummary summary : summaries)
+            totalFile += summary.files;
+        sb.append(totalFile).append(" files receiving");
+        sb.append('}');
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
new file mode 100644
index 0000000..67b4dee
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.*;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.UUID;
+
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+public class RetryMessage extends StreamMessage
+{
+    public static Serializer<RetryMessage> serializer = new Serializer<RetryMessage>()
+    {
+        public RetryMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+        {
+            DataInput input = new DataInputStream(Channels.newInputStream(in));
+            return new RetryMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
+        }
+
+        public void serialize(RetryMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException
+        {
+            DataOutput output = new DataOutputStream(Channels.newOutputStream(out));
+            UUIDSerializer.serializer.serialize(message.cfId, output, MessagingService.current_version);
+            output.writeInt(message.sequenceNumber);
+        }
+    };
+
+    public final UUID cfId;
+    public final int sequenceNumber;
+
+    public RetryMessage(UUID cfId, int sequenceNumber)
+    {
+        super(Type.RETRY);
+        this.cfId = cfId;
+        this.sequenceNumber = sequenceNumber;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
new file mode 100644
index 0000000..450f67d
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.cassandra.streaming.StreamSession;
+
+public class SessionFailedMessage extends StreamMessage
+{
+    public static Serializer<SessionFailedMessage> serializer = new Serializer<SessionFailedMessage>()
+    {
+        public SessionFailedMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+        {
+            return new SessionFailedMessage();
+        }
+
+        public void serialize(SessionFailedMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException {}
+    };
+
+    public SessionFailedMessage()
+    {
+        super(Type.SESSION_FAILED);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
new file mode 100644
index 0000000..d1e797c
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+/**
+ * StreamInitMessage is first sent from the node where {@link org.apache.cassandra.streaming.StreamSession} is started,
+ * to initiate corresponding {@link org.apache.cassandra.streaming.StreamSession} on the other side.
+ */
+public class StreamInitMessage
+{
+    public static IVersionedSerializer<StreamInitMessage> serializer = new StreamInitMessageSerializer();
+
+    public final UUID planId;
+    public final String description;
+
+    public StreamInitMessage(UUID planId, String description)
+    {
+        this.planId = planId;
+        this.description = description;
+    }
+
+    /**
+     * Create serialized message.
+     *
+     * @param compress true if message is compressed
+     * @param version Streaming protocol version
+     * @return serialized message in ByteBuffer format
+     */
+    public ByteBuffer createMessage(boolean compress, int version)
+    {
+        int header = 0;
+        // set compression bit.
+        if (compress)
+            header |= 4;
+        // set streaming bit
+        header |= 8;
+        // Setting up the version bit
+        header |= (version << 8);
+
+        /* Adding the StreamHeader which contains the session Id along
+         * with the pendingfile info for the stream.
+         * | Session Id | Pending File Size | Pending File | Bool more files |
+         * | No. of Pending files | Pending Files ... |
+         */
+        byte[] bytes;
+        try
+        {
+            int size = (int)StreamInitMessage.serializer.serializedSize(this, version);
+            DataOutputBuffer buffer = new DataOutputBuffer(size);
+            StreamInitMessage.serializer.serialize(this, buffer, version);
+            bytes = buffer.getData();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        assert bytes.length > 0;
+
+        ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + bytes.length);
+        buffer.putInt(MessagingService.PROTOCOL_MAGIC);
+        buffer.putInt(header);
+        buffer.put(bytes);
+        buffer.flip();
+        return buffer;
+    }
+
+    private static class StreamInitMessageSerializer implements IVersionedSerializer<StreamInitMessage>
+    {
+        public void serialize(StreamInitMessage message, DataOutput out, int version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version);
+            out.writeUTF(message.description);
+        }
+
+        public StreamInitMessage deserialize(DataInput in, int version) throws IOException
+        {
+            UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+            return new StreamInitMessage(planId, in.readUTF());
+        }
+
+        public long serializedSize(StreamInitMessage message, int version)
+        {
+            long size = UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version);
+            size += TypeSizes.NATIVE.sizeof(message.description);
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
new file mode 100644
index 0000000..f737675
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.cassandra.streaming.StreamSession;
+
+/**
+ * StreamMessage is an abstract base class that every messages in streaming protocol inherit.
+ *
+ * Every message carries message type({@link Type}) and streaming protocol version byte.
+ */
+public abstract class StreamMessage
+{
+    /** Streaming protocol version */
+    public static final int CURRENT_VERSION = 1;
+
+    public static void serialize(StreamMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException
+    {
+        ByteBuffer buff = ByteBuffer.allocate(1);
+        // message type
+        buff.put(message.type.type);
+        buff.flip();
+        out.write(buff);
+        message.type.serializer.serialize(message, out, version, session);
+    }
+
+    public static StreamMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+    {
+        ByteBuffer buff = ByteBuffer.allocate(1);
+        in.read(buff);
+        buff.flip();
+        Type type = Type.get(buff.get());
+        return type.serializer.deserialize(in, version, session);
+    }
+
+    /** StreamMessage serializer */
+    public static interface Serializer<V extends StreamMessage>
+    {
+        V deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException;
+        void serialize(V message, WritableByteChannel out, int version, StreamSession session) throws IOException;
+    }
+
+    /** StreamMessage types */
+    public static enum Type
+    {
+        PREPARE(1, 5, PrepareMessage.serializer),
+        FILE(2, 0, FileMessage.serializer),
+        RETRY(3, 1, RetryMessage.serializer),
+        COMPLETE(4, 4, CompleteMessage.serializer),
+        SESSION_FAILED(5, 5, SessionFailedMessage.serializer);
+
+        public static Type get(byte type)
+        {
+            for (Type t : Type.values())
+            {
+                if (t.type == type)
+                    return t;
+            }
+            throw new IllegalArgumentException("Unknown type " + type);
+        }
+
+        private final byte type;
+        public final int priority;
+        public final Serializer<StreamMessage> serializer;
+
+        @SuppressWarnings("unchecked")
+        private Type(int type, int priority, Serializer serializer)
+        {
+            this.type = (byte) type;
+            this.priority = priority;
+            this.serializer = serializer;
+        }
+    }
+
+    public final Type type;
+
+    protected StreamMessage(Type type)
+    {
+        this.type = type;
+    }
+
+    /**
+     * @return priority of this message. higher value, higher priority.
+     */
+    public int getPriority()
+    {
+        return type.priority;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index d2f3c4d..12650a2 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -24,6 +24,11 @@ import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.cli.*;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
 
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -32,17 +37,10 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableLoader;
-import org.apache.cassandra.streaming.PendingFile;
+import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.OutputHandler;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-
-import com.google.common.util.concurrent.Uninterruptibles;
 
 public class BulkLoader
 {
@@ -61,52 +59,20 @@ public class BulkLoader
     public static void main(String args[])
     {
         LoaderOptions options = LoaderOptions.parseArgs(args);
+        OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
+        SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts, options.rpcPort, options.user, options.passwd), handler);
+        DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
+        StreamResultFuture future = loader.stream(options.ignores);
+        future.addEventListener(new ProgressIndicator(handler));
         try
         {
-            OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
-            SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(handler, options.hosts, options.rpcPort, options.user, options.passwd), handler);
-            DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
-            SSTableLoader.LoaderFuture future = loader.stream(options.ignores);
-
-            if (options.noProgress)
-            {
-                future.get();
-            }
-            else
-            {
-                ProgressIndicator indicator = new ProgressIndicator(future.getPendingFiles());
-                indicator.start();
-                System.out.println("");
-                boolean printEnd = false;
-                while (!future.isDone())
-                {
-                    if (indicator.printProgress())
-                    {
-                        // We're done with streaming
-                        System.out.println("\nWaiting for targets to rebuild indexes ...");
-                        printEnd = true;
-                        future.get();
-                        assert future.isDone();
-                    }
-                    else
-                    {
-                        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-                    }
-                }
-                if (!printEnd)
-                    indicator.printProgress();
-                if (future.hadFailures())
-                {
-                    System.err.println("Streaming to the following hosts failed:");
-                    System.err.println(future.getFailedHosts());
-                    System.exit(1);
-                }
-            }
-
+            future.get();
             System.exit(0); // We need that to stop non daemonized threads
         }
         catch (Exception e)
         {
+            System.err.println("Streaming to the following hosts failed:");
+            System.err.println(loader.getFailedHosts());
             System.err.println(e.getMessage());
             if (options.debug)
                 e.printStackTrace(System.err);
@@ -115,62 +81,63 @@ public class BulkLoader
     }
 
     // Return true when everything is at 100%
-    static class ProgressIndicator
+    static class ProgressIndicator implements StreamEventHandler
     {
-        private final Map<InetAddress, Collection<PendingFile>> filesByHost;
+        private final Set<ProgressInfo> progresses = new HashSet<>();
+        private final OutputHandler handler;
+
         private long start;
         private long lastProgress;
         private long lastTime;
 
-        public ProgressIndicator(Map<InetAddress, Collection<PendingFile>> filesByHost)
-        {
-            this.filesByHost = new HashMap<InetAddress, Collection<PendingFile>>(filesByHost);
-        }
-
-        public void start()
+        public ProgressIndicator(OutputHandler handler)
         {
+            this.handler = handler;
             start = lastTime = System.nanoTime();
         }
 
-        public boolean printProgress()
+        public void onSuccess(StreamState finalState) {}
+        public void onFailure(Throwable t) {}
+
+        public void handleStreamEvent(StreamEvent event)
         {
-            boolean done = true;
-            StringBuilder sb = new StringBuilder();
-            sb.append("\rprogress: ");
-            long totalProgress = 0;
-            long totalSize = 0;
-            for (Map.Entry<InetAddress, Collection<PendingFile>> entry : filesByHost.entrySet())
+            if (event.eventType == StreamEvent.Type.FILE_PROGRESS)
             {
-                long progress = 0;
-                long size = 0;
-                int completed = 0;
-                Collection<PendingFile> pendings = entry.getValue();
-                for (PendingFile f : pendings)
+                ProgressInfo progressInfo = ((StreamEvent.ProgressEvent) event).progress;
+
+                // update progress
+                if (progresses.contains(progressInfo))
+                    progresses.remove(progressInfo);
+                progresses.add(progressInfo);
+
+                StringBuilder sb = new StringBuilder();
+                sb.append("\rprogress: ");
+
+                long totalProgress = 0;
+                long totalSize = 0;
+                long completed = 0;
+                for (ProgressInfo entry : progresses)
                 {
-                    progress += f.progress;
-                    size += f.size;
-                    if (f.progress == f.size)
+                    if (entry.currentBytes == entry.totalBytes)
                         completed++;
+                    totalProgress += entry.currentBytes;
+                    totalSize += entry.totalBytes;
+                    sb.append("[").append(entry.peer);
+                    sb.append(" ").append(completed).append("/").append(progresses.size());
+                    sb.append(" (").append(entry.totalBytes == 0 ? 100L : entry.currentBytes * 100L / entry.totalBytes).append(")] ");
                 }
-                totalProgress += progress;
-                totalSize += size;
-                if (completed != pendings.size())
-                    done = false;
-                sb.append("[").append(entry.getKey());
-                sb.append(" ").append(completed).append("/").append(pendings.size());
-                sb.append(" (").append(size == 0 ? 100L : progress * 100L / size).append(")] ");
+                long time = System.nanoTime();
+                long deltaTime = TimeUnit.NANOSECONDS.toMillis(time - lastTime);
+                lastTime = time;
+                long deltaProgress = totalProgress - lastProgress;
+                lastProgress = totalProgress;
+
+                sb.append("[total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append(" - ");
+                sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s");
+                sb.append(" (avg: ").append(mbPerSec(totalProgress, TimeUnit.NANOSECONDS.toMillis(time - start))).append("MB/s)]");
+
+                handler.output(sb.toString());
             }
-            long time = System.nanoTime();
-            long deltaTime = TimeUnit.NANOSECONDS.toMillis(time - lastTime);
-            lastTime = time;
-            long deltaProgress = totalProgress - lastProgress;
-            lastProgress = totalProgress;
-
-            sb.append("[total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append(" - ");
-            sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s");
-            sb.append(" (avg: ").append(mbPerSec(totalProgress, TimeUnit.NANOSECONDS.toMillis(time - start))).append("MB/s)]");
-            System.out.print(sb.toString());
-            return done;
         }
 
         private int mbPerSec(long bytes, long timeInMs)
@@ -182,13 +149,13 @@ public class BulkLoader
 
     static class ExternalClient extends SSTableLoader.Client
     {
-        private final Set<String> knownCfs = new HashSet<String>();
+        private final Set<String> knownCfs = new HashSet<>();
         private final Set<InetAddress> hosts;
         private final int rpcPort;
         private final String user;
         private final String passwd;
 
-        public ExternalClient(OutputHandler outputHandler, Set<InetAddress> hosts, int port, String user, String passwd)
+        public ExternalClient(Set<InetAddress> hosts, int port, String user, String passwd)
         {
             super();
             this.hosts = hosts;
@@ -213,7 +180,7 @@ public class BulkLoader
 
                     for (TokenRange tr : client.describe_ring(keyspace))
                     {
-                        Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
+                        Range<Token> range = new Range<>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
                         for (String ep : tr.endpoints)
                         {
                             addRangeForEndpoint(range, InetAddress.getByName(ep));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index b0b9e23..48ae998 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -45,6 +45,9 @@ import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
 import org.apache.cassandra.net.MessagingServiceMBean;
 import org.apache.cassandra.service.CacheServiceMBean;
 import org.apache.cassandra.service.StorageProxyMBean;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.SessionInfo;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.Pair;
 
@@ -614,56 +617,34 @@ public class NodeCmd
     public void printNetworkStats(final InetAddress addr, PrintStream outs)
     {
         outs.printf("Mode: %s%n", probe.getOperationMode());
-        Set<InetAddress> hosts = addr == null ? probe.getStreamDestinations() : new HashSet<InetAddress>(){{add(addr);}};
-        if (hosts.size() == 0)
+        Set<StreamState> statuses = probe.getStreamStatus();
+        if (statuses.isEmpty())
             outs.println("Not sending any streams.");
-        for (InetAddress host : hosts)
+        for (StreamState status : statuses)
         {
-            try
+            outs.printf("%s %s%n", status.description, status.planId.toString());
+            for (SessionInfo info : status.sessions)
             {
-                List<String> files = probe.getFilesDestinedFor(host);
-                if (files.size() > 0)
+                outs.printf("    %s%n", info.peer.toString());
+                if (!info.receivingSummaries.isEmpty())
                 {
-                    outs.printf("Streaming to: %s%n", host);
-                    for (String file : files)
-                        outs.printf("   %s%n", file);
+                    outs.printf("        Receiving %d files, %d bytes total%n", info.getTotalFilesToReceive(), info.getTotalSizeToReceive());
+                    for (ProgressInfo progress : info.getReceivingFiles())
+                    {
+                        outs.printf("            %s%n", progress.toString());
+                    }
                 }
-                else
+                if (!info.sendingSummaries.isEmpty())
                 {
-                    outs.printf(" Nothing streaming to %s%n", host);
+                    outs.printf("        Sending %d files, %d bytes total%n", info.getTotalFilesToSend(), info.getTotalSizeToSend());
+                    for (ProgressInfo progress : info.getSendingFiles())
+                    {
+                        outs.printf("            %s%n", progress.toString());
+                    }
                 }
             }
-            catch (IOException ex)
-            {
-                outs.printf("   Error retrieving file data for %s%n", host);
-            }
         }
 
-        hosts = addr == null ? probe.getStreamSources() : new HashSet<InetAddress>(){{add(addr); }};
-        if (hosts.size() == 0)
-            outs.println("Not receiving any streams.");
-        for (InetAddress host : hosts)
-        {
-            try
-            {
-                List<String> files = probe.getIncomingFiles(host);
-                if (files.size() > 0)
-                {
-                    outs.printf("Streaming from: %s%n", host);
-                    for (String file : files)
-                        outs.printf("   %s%n", file);
-                }
-                else
-                {
-                    outs.printf(" Nothing streaming from %s%n", host);
-                }
-            }
-            catch (IOException ex)
-            {
-                outs.printf("   Error retrieving file data for %s%n", host);
-            }
-        }
-        
         outs.printf("Read Repair Statistics:%nAttempted: %d%nMismatch (Blocking): %d%nMismatch (Background): %d%n", probe.getReadRepairAttempted(), probe.getReadRepairRepairedBlocking(), probe.getReadRepairRepairedBackground());
 
         MessagingServiceMBean ms = probe.msProxy;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index af94d14..452e865 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -50,8 +50,8 @@ import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.MessagingServiceMBean;
 import org.apache.cassandra.service.*;
-import org.apache.cassandra.streaming.StreamingService;
-import org.apache.cassandra.streaming.StreamingServiceMBean;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.streaming.StreamManagerMBean;
 import org.apache.cassandra.utils.SimpleCondition;
 
 /**
@@ -73,7 +73,7 @@ public class NodeProbe
     private StorageServiceMBean ssProxy;
     private MemoryMXBean memProxy;
     private RuntimeMXBean runtimeProxy;
-    private StreamingServiceMBean streamProxy;
+    private StreamManagerMBean streamProxy;
     public MessagingServiceMBean msProxy;
     private FailureDetectorMBean fdProxy;
     private CacheServiceMBean cacheService;
@@ -150,8 +150,8 @@ public class NodeProbe
             ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class);
             name = new ObjectName(MessagingService.MBEAN_NAME);
             msProxy = JMX.newMBeanProxy(mbeanServerConn, name, MessagingServiceMBean.class);
-            name = new ObjectName(StreamingService.MBEAN_OBJECT_NAME);
-            streamProxy = JMX.newMBeanProxy(mbeanServerConn, name, StreamingServiceMBean.class);
+            name = new ObjectName(StreamManagerMBean.OBJECT_NAME);
+            streamProxy = JMX.newMBeanProxy(mbeanServerConn, name, StreamManagerMBean.class);
             name = new ObjectName(CompactionManager.MBEAN_OBJECT_NAME);
             compactionProxy = JMX.newMBeanProxy(mbeanServerConn, name, CompactionManagerMBean.class);
             name = new ObjectName(FailureDetector.MBEAN_NAME);
@@ -545,24 +545,9 @@ public class NodeProbe
         return cfsProxy.getSSTablesForKey(key);
     }
 
-    public Set<InetAddress> getStreamDestinations()
+    public Set<StreamState> getStreamStatus()
     {
-        return streamProxy.getStreamDestinations();
-    }
-
-    public List<String> getFilesDestinedFor(InetAddress host) throws IOException
-    {
-        return streamProxy.getOutgoingFiles(host.getHostAddress());
-    }
-
-    public Set<InetAddress> getStreamSources()
-    {
-        return streamProxy.getStreamSources();
-    }
-
-    public List<String> getIncomingFiles(InetAddress host) throws IOException
-    {
-        return streamProxy.getIncomingFiles(host.getHostAddress());
+        return streamProxy.getCurrentStreams();
     }
 
     public String getOperationMode()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 7bbe72d..17fa3fd 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.OperationType;
 
 import static org.junit.Assert.*;
 
@@ -66,7 +65,7 @@ public class BootStrapperTest extends SchemaLoader
 
         TokenMetadata tmd = ss.getTokenMetadata();
         assertEquals(numOldNodes, tmd.sortedTokens().size());
-        RangeStreamer s = new RangeStreamer(tmd, myEndpoint, OperationType.BOOTSTRAP);
+        RangeStreamer s = new RangeStreamer(tmd, myEndpoint, "Bootstrap");
         IFailureDetector mockFailureDetector = new IFailureDetector()
         {
             public boolean isAlive(InetAddress ep)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 4f3217c..1b3eb48 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -27,29 +27,23 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.*;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.sink.IMessageSink;
 import org.apache.cassandra.net.sink.SinkManager;
-import org.apache.cassandra.streaming.StreamUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class RemoveTest
 {
@@ -119,9 +113,6 @@ public class RemoveTest
     @Test
     public void testRemoveHostId() throws InterruptedException
     {
-        ReplicationSink rSink = new ReplicationSink();
-        SinkManager.add(rSink);
-
         // start removal in background and send replication confirmations
         final AtomicBoolean success = new AtomicBoolean(false);
         Thread remover = new Thread()
@@ -159,25 +150,4 @@ public class RemoveTest
         assertTrue(success.get());
         assertTrue(tmd.getLeavingEndpoints().isEmpty());
     }
-
-    /**
-     * sink that captures STREAM_REQUEST messages and calls finishStreamRequest on it
-     */
-    class ReplicationSink implements IMessageSink
-    {
-        public MessageIn handleMessage(MessageIn msg, int id, InetAddress to)
-        {
-            if (!msg.verb.equals(MessagingService.Verb.STREAM_REQUEST))
-                return msg;
-
-            StreamUtil.finishStreamRequest(msg, to);
-
-            return null;
-        }
-
-        public MessageOut handleMessage(MessageOut msg, int id, InetAddress to)
-        {
-            return msg;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/BootstrapTest.java b/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
deleted file mode 100644
index 6578dde..0000000
--- a/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.streaming;
-
-import static junit.framework.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.utils.Pair;
-
-import java.util.Arrays;
-
-import org.junit.Test;
-
-public class BootstrapTest extends SchemaLoader
-{
-    @Test
-    public void testGetNewNames() throws IOException
-    {
-        String ver = Descriptor.Version.current_version;
-        Descriptor desc = Descriptor.fromFilename(new File("Keyspace1", "Keyspace1-Standard1-" + ver +"-500-Data.db").toString());
-        // assert !desc.isLatestVersion; // minimum compatible version -- for now it is the latest as well
-        PendingFile inContext = new PendingFile(null, desc, "Data.db", Arrays.asList(Pair.create(0L, 1L)), OperationType.BOOTSTRAP);
-
-        PendingFile outContext = StreamIn.getContextMapping(inContext);
-        // filename and generation are expected to have changed
-        assert !inContext.getFilename().equals(outContext.getFilename());
-
-        // nothing else should
-        assertEquals(inContext.component, outContext.component);
-        assertEquals(desc.ksname, outContext.desc.ksname);
-        assertEquals(desc.cfname, outContext.desc.cfname);
-        assertEquals(desc.version, outContext.desc.version);
-    }
-}