You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/20 19:07:16 UTC

[4/5] Streaming 2.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
deleted file mode 100644
index 04890ba..0000000
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.ning.compress.lzf.LZFOutputStream;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.util.DataIntegrityMetadata;
-import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.metrics.StreamingMetrics;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.Throttle;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-public class FileStreamTask extends WrappedRunnable
-{
-    private static final Logger logger = LoggerFactory.getLogger(FileStreamTask.class);
-
-    private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
-    public static final int MAX_CONNECT_ATTEMPTS = 4;
-
-    protected final StreamHeader header;
-    protected final InetAddress to;
-
-    // communication socket
-    protected Socket socket;
-    // socket's output/input stream
-    private OutputStream output;
-    private OutputStream compressedoutput;
-    private DataInputStream input;
-    // allocate buffer to use for transfers only once
-    private byte[] transferBuffer;
-    // outbound global throughput limiter
-    protected final Throttle throttle;
-    private final StreamReplyVerbHandler handler = new StreamReplyVerbHandler();
-    protected final StreamingMetrics metrics;
-
-    public FileStreamTask(StreamHeader header, InetAddress to)
-    {
-        this.header = header;
-        this.to = to;
-        this.throttle = new Throttle(toString(), new Throttle.ThroughputFunction()
-        {
-            /** @return Instantaneous throughput target in bytes per millisecond. */
-            public int targetThroughput()
-            {
-                if (DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() < 1)
-                    // throttling disabled
-                    return 0;
-                // total throughput
-                int totalBytesPerMS = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * 1024 * 1024 / 8 / 1000;
-                // per stream throughput (target bytes per MS)
-                return totalBytesPerMS / Math.max(1, (int)StreamingMetrics.activeStreamsOutbound.count());
-            }
-        });
-        metrics = StreamingMetrics.get(to);
-    }
-
-    public void runMayThrow() throws IOException
-    {
-        try
-        {
-            connectAttempt();
-            // successfully connected: stream.
-            // (at this point, if we fail, it is the receiver's job to re-request)
-            stream();
-
-            StreamOutSession session = StreamOutSession.get(header.sessionId);
-            if (session == null)
-            {
-                logger.info("Found no stream out session at end of file stream task - this is expected if the receiver went down");
-            }
-            else if (session.getFiles().size() == 0)
-            {
-                // we are the last of our kind, receive the final confirmation before closing
-                receiveReply();
-                logger.info("Finished streaming session to {}", to);
-            }
-        }
-        catch (IOException e)
-        {
-            StreamOutSession session = StreamOutSession.get(header.sessionId);
-            if (session != null)
-                session.close(false);
-            throw e;
-        }
-        finally
-        {
-            try
-            {
-                close();
-            }
-            catch (IOException e)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("error closing socket", e);
-            }
-        }
-        if (logger.isDebugEnabled())
-            logger.debug("Done streaming " + header.file);
-    }
-
-    /**
-     * Stream file by it's sections specified by this.header
-     * @throws IOException on any I/O error
-     */
-    protected void stream() throws IOException
-    {
-        ByteBuffer headerBuffer = MessagingService.instance().constructStreamHeader(header, false, MessagingService.instance().getVersion(to));
-        // write header (this should not be compressed for compatibility with other messages)
-        output.write(ByteBufferUtil.getArray(headerBuffer));
-
-        if (header.file == null)
-            return;
-
-        // try to skip kernel page cache if possible
-        RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename()));
-        Descriptor desc = Descriptor.fromFilename(header.file.getFilename());
-        ChecksumValidator metadata = null;
-        if (new File(desc.filenameFor(Component.CRC)).exists())
-            metadata = DataIntegrityMetadata.checksumValidator(desc);
-        transferBuffer = metadata == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[metadata.chunkSize];
-
-        // setting up data compression stream
-        compressedoutput = new LZFOutputStream(output);
-
-        StreamingMetrics.activeStreamsOutbound.inc();
-        try
-        {
-            long totalBytesTransferred = 0;
-            // stream each of the required sections of the file
-            for (Pair<Long, Long> section : header.file.sections)
-            {
-                long start = metadata == null ? section.left : metadata.chunkStart(section.left);
-                int skipBytes = (int) (section.left - start);
-                // seek to the beginning of the section
-                file.seek(start);
-                if (metadata != null)
-                    metadata.seek(start);
-
-                // length of the section to read
-                long length = section.right - start;
-                // tracks write progress
-                long bytesTransferred = 0;
-
-                while (bytesTransferred < length)
-                {
-                    long lastWrite = write(file, metadata, skipBytes, length, bytesTransferred);
-                    bytesTransferred += lastWrite;
-                    totalBytesTransferred += lastWrite;
-                    // store streaming progress
-                    header.file.progress += lastWrite;
-                    skipBytes = 0;
-                }
-
-                // make sure that current section is send
-                compressedoutput.flush();
-
-                if (logger.isDebugEnabled())
-                    logger.debug("Bytes transferred " + bytesTransferred + "/" + header.file.size);
-            }
-            StreamingMetrics.totalOutgoingBytes.inc(totalBytesTransferred);
-            metrics.outgoingBytes.inc(totalBytesTransferred);
-            // receive reply confirmation
-            receiveReply();
-        }
-        finally
-        {
-            StreamingMetrics.activeStreamsOutbound.dec();
-
-            // no matter what happens close file
-            FileUtils.closeQuietly(file);
-        }
-    }
-
-    public static void sendReply(MessageOut message, DataOutputStream out) throws IOException
-    {
-        out.writeInt(MessagingService.PROTOCOL_MAGIC);
-        message.serialize(out, MessagingService.current_version);
-    }
-
-    protected void receiveReply() throws IOException
-    {
-        MessagingService.validateMagic(input.readInt());
-        // since we reject streaming with different version, using current_version here is fine
-        MessageIn message = MessageIn.read(input, MessagingService.current_version, -1);
-        assert message.verb == MessagingService.Verb.STREAM_REPLY : "Non-reply message received on stream socket";
-        handler.doVerb(message, -1);
-    }
-
-    /**
-     * Sequentially read bytes from the file and write them to the output stream
-     *
-     * @param reader The file reader to read from
-     * @param validator validator to verify data integrity
-     * @param start number of bytes to skip transfer, but include for validation.
-     * @param length The full length that should be transferred
-     * @param bytesTransferred Number of bytes remaining to transfer
-     *
-     * @return Number of bytes transferred
-     *
-     * @throws IOException on any I/O error
-     */
-    protected long write(RandomAccessReader reader, ChecksumValidator validator, int start, long length, long bytesTransferred) throws IOException
-    {
-        int toTransfer = (int) Math.min(transferBuffer.length, length - bytesTransferred);
-        int minReadable = (int) Math.min(transferBuffer.length, reader.length() - reader.getFilePointer());
-
-        reader.readFully(transferBuffer, 0, minReadable);
-        if (validator != null)
-            validator.validate(transferBuffer, 0, minReadable);
-
-        compressedoutput.write(transferBuffer, start, (toTransfer - start));
-        throttle.throttleDelta(toTransfer);
-
-        return toTransfer;
-    }
-
-    /**
-     * Connects to the destination, with backoff for failed attempts.
-     * TODO: all nodes on a cluster must currently use the same storage port
-     * @throws IOException If all attempts fail.
-     */
-    private void connectAttempt() throws IOException
-    {
-        int attempts = 0;
-        while (true)
-        {
-            try
-            {
-                socket = MessagingService.instance().getConnectionPool(to).newSocket();
-                socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
-                output = socket.getOutputStream();
-                input = new DataInputStream(socket.getInputStream());
-                break;
-            }
-            catch (IOException e)
-            {
-                if (++attempts >= MAX_CONNECT_ATTEMPTS)
-                    throw e;
-
-                long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts);
-                logger.warn("Failed attempt " + attempts + " to connect to " + to + " to stream " + header.file + ". Retrying in " + waitms + " ms. (" + e + ")");
-                Uninterruptibles.sleepUninterruptibly(waitms, TimeUnit.MILLISECONDS);
-            }
-        }
-    }
-
-    protected void close() throws IOException
-    {
-        if (output != null)
-            output.close();
-    }
-
-    public String toString()
-    {
-        return String.format("FileStreamTask(session=%s, to=%s)", header.sessionId, to);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/IStreamCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IStreamCallback.java b/src/java/org/apache/cassandra/streaming/IStreamCallback.java
deleted file mode 100644
index f0d7754..0000000
--- a/src/java/org/apache/cassandra/streaming/IStreamCallback.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.streaming;
-
-/**
- * Callback interface for streaming session success/failure.
- */
-public interface IStreamCallback
-{
-    /**
-     * called when stream session is finished successfully.
-     */
-    public void onSuccess();
-
-    /**
-     * called when streaming somehow got in trouble.
-     */
-    public void onFailure();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
deleted file mode 100644
index 92f5c7f..0000000
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Collections;
-
-import com.google.common.base.Throwables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.ning.compress.lzf.LZFInputStream;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.compaction.CompactionController;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.metrics.StreamingMetrics;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.compress.CompressedInputStream;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.BytesReadTracker;
-import org.apache.cassandra.utils.Pair;
-
-public class IncomingStreamReader
-{
-    private static final Logger logger = LoggerFactory.getLogger(IncomingStreamReader.class);
-
-    protected final PendingFile localFile;
-    protected final PendingFile remoteFile;
-    protected final StreamInSession session;
-    private final InputStream underliningStream;
-    private final StreamingMetrics metrics;
-
-    public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
-    {
-        socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
-        InetAddress host = ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
-        if (header.pendingFiles.isEmpty() && header.file != null)
-        {
-            // StreamInSession should be created already when receiving 2nd and after files
-            if (!StreamInSession.hasSession(header.sessionId))
-            {
-                StreamReply reply = new StreamReply("", header.sessionId, StreamReply.Status.SESSION_FAILURE);
-                FileStreamTask.sendReply(reply.createMessage(), new DataOutputStream(socket.getOutputStream()));
-                throw new IOException("Session " + header.sessionId + " already closed.");
-            }
-        }
-        session = StreamInSession.get(host, header.sessionId);
-        session.setSocket(socket);
-
-        session.addFiles(header.pendingFiles);
-        // set the current file we are streaming so progress shows up in jmx
-        session.setCurrentFile(header.file);
-        session.setTable(header.table);
-        // pendingFile gets the new context for the local node.
-        remoteFile = header.file;
-        localFile = remoteFile != null ? StreamIn.getContextMapping(remoteFile) : null;
-
-        if (remoteFile != null)
-        {
-            if (remoteFile.compressionInfo == null)
-                underliningStream = new LZFInputStream(socket.getInputStream());
-            else
-                underliningStream = new CompressedInputStream(socket.getInputStream(), remoteFile.compressionInfo);
-        }
-        else
-        {
-            underliningStream = null;
-        }
-        metrics = StreamingMetrics.get(host);
-    }
-
-    /**
-     * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
-     */
-    public void read() throws IOException
-    {
-        if (remoteFile != null)
-        {
-            if (logger.isDebugEnabled())
-            {
-                logger.debug("Receiving stream");
-                logger.debug("Creating file for {} with {} estimated keys",
-                             localFile.getFilename(),
-                             remoteFile.estimatedKeys);
-            }
-
-            assert remoteFile.estimatedKeys > 0;
-            DataInput in = new DataInputStream(underliningStream);
-            try
-            {
-                SSTableReader reader = streamIn(in, localFile, remoteFile);
-                session.finished(remoteFile, reader);
-            }
-            catch (IOException ex)
-            {
-                retry();
-                throw ex;
-            }
-            catch (RuntimeException e)
-            {
-                // if we encountered unexpected exception, fail this session
-                session.close(false);
-                throw e;
-            }
-        }
-
-        session.closeIfFinished();
-    }
-
-    /**
-     * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
-     */
-    private SSTableReader streamIn(DataInput input, PendingFile localFile, PendingFile remoteFile) throws IOException
-    {
-        ColumnFamilyStore cfs = Table.open(localFile.desc.ksname).getColumnFamilyStore(localFile.desc.cfname);
-        DecoratedKey key;
-        SSTableWriter writer = new SSTableWriter(localFile.getFilename(), remoteFile.estimatedKeys);
-        CompactionController controller = new CompactionController(cfs, Collections.<SSTableReader>emptySet(), Integer.MIN_VALUE);
-
-        try
-        {
-            BytesReadTracker in = new BytesReadTracker(input);
-            long totalBytesRead = 0;
-
-            for (Pair<Long, Long> section : localFile.sections)
-            {
-                long length = section.right - section.left;
-                // skip to beginning of section inside chunk
-                if (remoteFile.compressionInfo != null)
-                    ((CompressedInputStream) underliningStream).position(section.left);
-                long bytesRead = 0;
-                while (bytesRead < length)
-                {
-                    in.reset(0);
-
-                    key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
-                    writer.appendFromStream(key, cfs.metadata, in);
-
-                    cfs.invalidateCachedRow(key);
-
-                    bytesRead += in.getBytesRead();
-                    // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
-                    if (remoteFile.compressionInfo != null)
-                        remoteFile.progress = ((CompressedInputStream) underliningStream).getTotalCompressedBytesRead();
-                    else
-                        remoteFile.progress += in.getBytesRead();
-                    totalBytesRead += in.getBytesRead();
-                }
-            }
-            StreamingMetrics.totalIncomingBytes.inc(totalBytesRead);
-            metrics.incomingBytes.inc(totalBytesRead);
-            return writer.closeAndOpenReader();
-        }
-        catch (Throwable e)
-        {
-            writer.abort();
-            if (e instanceof IOException)
-                throw (IOException) e;
-            else
-                throw Throwables.propagate(e);
-        }
-        finally
-        {
-            controller.close();
-        }
-    }
-
-    private void retry()
-    {
-        /* Ask the source node to re-stream this file. */
-        session.retry(remoteFile);
-
-        /* Delete the orphaned file. */
-        if (new File(localFile.getFilename()).isFile())
-            FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/OperationType.java b/src/java/org/apache/cassandra/streaming/OperationType.java
deleted file mode 100644
index da6e2f6..0000000
--- a/src/java/org/apache/cassandra/streaming/OperationType.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-/**
- * Streaming operation type.
- */
-public enum OperationType
-{
-    AES,
-    BOOTSTRAP,
-    UNBOOTSTRAP,
-    RESTORE_REPLICA_COUNT,
-    BULK_LOAD,
-    REBUILD
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/PendingFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/PendingFile.java b/src/java/org/apache/cassandra/streaming/PendingFile.java
deleted file mode 100644
index 0d9ec35..0000000
--- a/src/java/org/apache/cassandra/streaming/PendingFile.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.streaming.compress.CompressionInfo;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * Represents portions of a file to be streamed between nodes.
- */
-public class PendingFile
-{
-    public static final PendingFileSerializer serializer = new PendingFileSerializer();
-
-    // NB: this reference is used to be able to release the acquired reference upon completion
-    public final SSTableReader sstable;
-
-    public final Descriptor desc;
-    public final String component;
-    public final List<Pair<Long, Long>> sections;
-    public final OperationType type;
-    /** total length of data to transfer */
-    public final long size;
-    /** estimated number of keys to transfer */
-    public final long estimatedKeys;
-    /** compression information. null if data is not compressed */
-    public final CompressionInfo compressionInfo;
-    public long progress;
-
-    public PendingFile(Descriptor desc, PendingFile pf)
-    {
-        this(null, desc, pf.component, pf.sections, pf.type, pf.estimatedKeys, pf.compressionInfo);
-    }
-
-    public PendingFile(SSTableReader sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type)
-    {
-        this(sstable, desc, component, sections, type, 0, null);
-    }
-
-    public PendingFile(SSTableReader sstable,
-                       Descriptor desc,
-                       String component,
-                       List<Pair<Long,Long>> sections,
-                       OperationType type,
-                       long estimatedKeys,
-                       CompressionInfo compressionInfo)
-    {
-        this.sstable = sstable;
-        this.desc = desc;
-        this.component = component;
-        this.sections = sections;
-        this.type = type;
-
-        long tempSize = 0;
-        if (compressionInfo == null)
-        {
-            for (Pair<Long, Long> section : sections)
-                tempSize += section.right - section.left;
-        }
-        else
-        {
-            // calculate total length of transferring chunks
-            for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-                tempSize += chunk.length + 4; // 4 bytes for CRC
-        }
-        size = tempSize;
-
-        this.estimatedKeys = estimatedKeys;
-        this.compressionInfo = compressionInfo;
-    }
-
-    public String getFilename()
-    {
-        return desc.filenameFor(component);
-    }
-
-    public boolean equals(Object o)
-    {
-        if (!(o instanceof PendingFile))
-            return false;
-
-        PendingFile rhs = (PendingFile)o;
-        return getFilename().equals(rhs.getFilename());
-    }
-
-    public int hashCode()
-    {
-        return getFilename().hashCode();
-    }
-
-    public String toString()
-    {
-        return getFilename() + " sections=" + sections.size() + " progress=" + progress + "/" + size + " - " + progress*100/size + "%";
-    }
-
-    public static class PendingFileSerializer implements IVersionedSerializer<PendingFile>
-    {
-        public void serialize(PendingFile sc, DataOutput out, int version) throws IOException
-        {
-            if (sc == null)
-            {
-                out.writeUTF("");
-                return;
-            }
-
-            out.writeUTF(sc.desc.filenameFor(sc.component));
-            out.writeUTF(sc.component);
-            out.writeInt(sc.sections.size());
-            for (Pair<Long,Long> section : sc.sections)
-            {
-                out.writeLong(section.left);
-                out.writeLong(section.right);
-            }
-            out.writeUTF(sc.type.name());
-            out.writeLong(sc.estimatedKeys);
-            CompressionInfo.serializer.serialize(sc.compressionInfo, out, version);
-        }
-
-        public PendingFile deserialize(DataInput in, int version) throws IOException
-        {
-            String filename = in.readUTF();
-            if (filename.isEmpty())
-                return null;
-
-            Descriptor desc = Descriptor.fromFilename(filename);
-            String component = in.readUTF();
-            int count = in.readInt();
-            List<Pair<Long,Long>> sections = new ArrayList<Pair<Long,Long>>(count);
-            for (int i = 0; i < count; i++)
-                sections.add(Pair.create(in.readLong(), in.readLong()));
-            // this controls the way indexes are rebuilt when streaming in.
-            OperationType type = OperationType.RESTORE_REPLICA_COUNT;
-            type = OperationType.valueOf(in.readUTF());
-            long estimatedKeys = in.readLong();
-            CompressionInfo info = null;
-            info = CompressionInfo.serializer.deserialize(in, version);
-            return new PendingFile(null, desc, component, sections, type, estimatedKeys, info);
-        }
-
-        public long serializedSize(PendingFile pf, int version)
-        {
-            if (pf == null)
-                return TypeSizes.NATIVE.sizeof("");
-
-            long size = TypeSizes.NATIVE.sizeof(pf.desc.filenameFor(pf.component));
-            size += TypeSizes.NATIVE.sizeof(pf.component);
-            size += TypeSizes.NATIVE.sizeof(pf.sections.size());
-            for (Pair<Long,Long> section : pf.sections)
-                size += TypeSizes.NATIVE.sizeof(section.left) + TypeSizes.NATIVE.sizeof(section.right);
-            size += TypeSizes.NATIVE.sizeof(pf.type.name());
-            size += TypeSizes.NATIVE.sizeof(pf.estimatedKeys);
-            size += CompressionInfo.serializer.serializedSize(pf.compressionInfo, version);
-            return size;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/ProgressInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
new file mode 100644
index 0000000..d308ed0
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+
+import com.google.common.base.Objects;
+
+/**
+ * ProgressInfo contains file transfer progress.
+ */
+public class ProgressInfo implements Serializable
+{
+    /**
+     * Direction of the stream.
+     */
+    public static enum Direction
+    {
+        OUT(0),
+        IN(1);
+
+        public final byte code;
+
+        private Direction(int code)
+        {
+            this.code = (byte) code;
+        }
+
+        public static Direction fromByte(byte direction)
+        {
+            return direction == 0 ? OUT : IN;
+        }
+    }
+
+    public final InetAddress peer;
+    public final String fileName;
+    public final Direction direction;
+    public final long currentBytes;
+    public final long totalBytes;
+
+    public ProgressInfo(InetAddress peer, String fileName, Direction direction, long currentBytes, long totalBytes)
+    {
+        assert totalBytes > 0;
+
+        this.peer = peer;
+        this.fileName = fileName;
+        this.direction = direction;
+        this.currentBytes = currentBytes;
+        this.totalBytes = totalBytes;
+    }
+
+    /**
+     * @return true if file transfer is completed
+     */
+    public boolean isCompleted()
+    {
+        return currentBytes == totalBytes;
+    }
+
+    /**
+     * ProgressInfo is considered to be equal only when all attributes except currentBytes are equal.
+     */
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ProgressInfo that = (ProgressInfo) o;
+
+        if (totalBytes != that.totalBytes) return false;
+        if (direction != that.direction) return false;
+        if (!fileName.equals(that.fileName)) return false;
+        return peer.equals(that.peer);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(peer, fileName, direction, totalBytes);
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder(fileName);
+        sb.append(" ").append(currentBytes);
+        sb.append("/").append(totalBytes).append(" bytes");
+        sb.append("(").append(currentBytes*100/totalBytes).append("%) ");
+        sb.append(direction == Direction.OUT ? "sent to " : "received from ");
+        sb.append(peer);
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java
new file mode 100644
index 0000000..9532041
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+/**
+ * Stream session info.
+ */
+public final class SessionInfo implements Serializable
+{
+    public final InetAddress peer;
+    /** Immutable collection of receiving summaries */
+    public final Collection<StreamSummary> receivingSummaries;
+    /** Immutable collection of sending summaries*/
+    public final Collection<StreamSummary> sendingSummaries;
+    /** Current session state */
+    public final StreamSession.State state;
+
+    private final Map<String, ProgressInfo> receivingFiles;
+    private final Map<String, ProgressInfo> sendingFiles;
+
+    public SessionInfo(InetAddress peer,
+                       Collection<StreamSummary> receivingSummaries,
+                       Collection<StreamSummary> sendingSummaries,
+                       StreamSession.State state)
+    {
+        this.peer = peer;
+        this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
+        this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
+        this.receivingFiles = new HashMap<>();
+        this.sendingFiles = new HashMap<>();
+        this.state = state;
+    }
+
+    public boolean isFailed()
+    {
+        return state == StreamSession.State.FAILED;
+    }
+
+    /**
+     * Update progress of receiving/sending file.
+     *
+     * @param newProgress new progress info
+     */
+    public synchronized void updateProgress(ProgressInfo newProgress)
+    {
+        assert peer.equals(newProgress.peer);
+
+        Map<String, ProgressInfo> currentFiles = newProgress.direction == ProgressInfo.Direction.IN
+                                                    ? receivingFiles : sendingFiles;
+        currentFiles.put(newProgress.fileName, newProgress);
+    }
+
+    public Collection<ProgressInfo> getReceivingFiles()
+    {
+        return receivingFiles.values();
+    }
+
+    public Collection<ProgressInfo> getSendingFiles()
+    {
+        return sendingFiles.values();
+    }
+
+    /**
+     * @return total number of files already received.
+     */
+    public long getTotalFilesReceived()
+    {
+        return getTotalFilesCompleted(receivingFiles.values());
+    }
+
+    /**
+     * @return total number of files already sent.
+     */
+    public long getTotalFilesSent()
+    {
+        return getTotalFilesCompleted(sendingFiles.values());
+    }
+
+    /**
+     * @return total size(in bytes) already received.
+     */
+    public long getTotalSizeReceived()
+    {
+        return getTotalSizeInProgress(receivingFiles.values());
+    }
+
+    /**
+     * @return total size(in bytes) already sent.
+     */
+    public long getTotalSizeSent()
+    {
+        return getTotalSizeInProgress(sendingFiles.values());
+    }
+
+    /**
+     * @return total number of files to receive in the session
+     */
+    public long getTotalFilesToReceive()
+    {
+        return getTotalFiles(receivingSummaries);
+    }
+
+    /**
+     * @return total number of files to send in the session
+     */
+    public long getTotalFilesToSend()
+    {
+        return getTotalFiles(sendingSummaries);
+    }
+
+    /**
+     * @return total size(in bytes) to receive in the session
+     */
+    public long getTotalSizeToReceive()
+    {
+        return getTotalSizes(receivingSummaries);
+    }
+
+    /**
+     * @return total size(in bytes) to send in the session
+     */
+    public long getTotalSizeToSend()
+    {
+        return getTotalSizes(sendingSummaries);
+    }
+
+    private long getTotalSizeInProgress(Collection<ProgressInfo> files)
+    {
+        long total = 0;
+        for (ProgressInfo file : files)
+            total += file.currentBytes;
+        return total;
+    }
+
+    private long getTotalFiles(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.files;
+        return total;
+    }
+
+    private long getTotalSizes(Collection<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.totalSize;
+        return total;
+    }
+
+    private long getTotalFilesCompleted(Collection<ProgressInfo> files)
+    {
+        Iterable<ProgressInfo> completed = Iterables.filter(files, new Predicate<ProgressInfo>()
+        {
+            public boolean apply(ProgressInfo input)
+            {
+                return input.isCompleted();
+            }
+        });
+        return Iterables.size(completed);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java b/src/java/org/apache/cassandra/streaming/StreamEvent.java
new file mode 100644
index 0000000..9af1fbd
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+public abstract class StreamEvent
+{
+    public static enum Type
+    {
+        STREAM_PREPARED,
+        STREAM_COMPLETE,
+        FILE_PROGRESS,
+    }
+
+    public final Type eventType;
+    public final UUID planId;
+
+    protected StreamEvent(Type eventType, UUID planId)
+    {
+        this.eventType = eventType;
+        this.planId = planId;
+    }
+
+    public static class SessionCompleteEvent extends StreamEvent
+    {
+        public final InetAddress peer;
+        public final boolean success;
+
+        public SessionCompleteEvent(StreamSession session)
+        {
+            super(Type.STREAM_COMPLETE, session.planId());
+            this.peer = session.peer;
+            this.success = session.isSuccess();
+        }
+    }
+
+    public static class ProgressEvent extends StreamEvent
+    {
+        public final ProgressInfo progress;
+
+        public ProgressEvent(UUID planId, ProgressInfo progress)
+        {
+            super(Type.FILE_PROGRESS, planId);
+            this.progress = progress;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "<ProgressEvent " + progress.toString() + ">";
+        }
+    }
+
+    public static class SessionPreparedEvent extends StreamEvent
+    {
+        public final SessionInfo session;
+
+        public SessionPreparedEvent(UUID planId, SessionInfo session)
+        {
+            super(Type.STREAM_PREPARED, planId);
+            this.session = session;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamEventHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamEventHandler.java b/src/java/org/apache/cassandra/streaming/StreamEventHandler.java
new file mode 100644
index 0000000..e2e84bb
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamEventHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import com.google.common.util.concurrent.FutureCallback;
+
+public interface StreamEventHandler extends FutureCallback<StreamState>
+{
+    /**
+     * Callback for various streaming events.
+     *
+     * @see StreamEvent.Type
+     * @param event Stream event.
+     */
+    void handleStreamEvent(StreamEvent event);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamException.java b/src/java/org/apache/cassandra/streaming/StreamException.java
new file mode 100644
index 0000000..6e22db2
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+public class StreamException extends Throwable
+{
+    public final StreamState finalState;
+
+    public StreamException(StreamState finalState, String message)
+    {
+        super(message);
+        this.finalState = finalState;
+    }
+
+    public StreamException(StreamState finalState, String message, Throwable cause)
+    {
+        super(message, cause);
+        this.finalState = finalState;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamHeader.java b/src/java/org/apache/cassandra/streaming/StreamHeader.java
deleted file mode 100644
index a0bf832..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamHeader.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.*;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.UUIDSerializer;
-
-public class StreamHeader
-{
-    public static final IVersionedSerializer<StreamHeader> serializer = new StreamHeaderSerializer();
-
-    public final String table;
-
-    /** file being sent on initial stream */
-    public final PendingFile file;
-
-    /** session ID */
-    public final UUID sessionId;
-
-    /** files to add to the session */
-    public final Collection<PendingFile> pendingFiles;
-
-    public StreamHeader(String table, UUID sessionId, PendingFile file)
-    {
-        this(table, sessionId, file, Collections.<PendingFile>emptyList());
-    }
-
-    public StreamHeader(String table, UUID sessionId, PendingFile first, Collection<PendingFile> pendingFiles)
-    {
-        this.table = table;
-        this.sessionId  = sessionId;
-        this.file = first;
-        this.pendingFiles = pendingFiles;
-    }
-
-    private static class StreamHeaderSerializer implements IVersionedSerializer<StreamHeader>
-    {
-        public void serialize(StreamHeader sh, DataOutput out, int version) throws IOException
-        {
-            out.writeUTF(sh.table);
-            UUIDSerializer.serializer.serialize(sh.sessionId, out, MessagingService.current_version);
-            PendingFile.serializer.serialize(sh.file, out, version);
-            out.writeInt(sh.pendingFiles.size());
-            for (PendingFile file : sh.pendingFiles)
-                PendingFile.serializer.serialize(file, out, version);
-        }
-
-        public StreamHeader deserialize(DataInput in, int version) throws IOException
-        {
-            String table = in.readUTF();
-            UUID sessionId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
-            PendingFile file = PendingFile.serializer.deserialize(in, version);
-            int size = in.readInt();
-
-            List<PendingFile> pendingFiles = new ArrayList<PendingFile>(size);
-            for (int i = 0; i < size; i++)
-                pendingFiles.add(PendingFile.serializer.deserialize(in, version));
-            return new StreamHeader(table, sessionId, file, pendingFiles);
-        }
-
-        public long serializedSize(StreamHeader sh, int version)
-        {
-            long size = TypeSizes.NATIVE.sizeof(sh.table);
-            size += TypeSizes.NATIVE.sizeof(sh.sessionId);
-            size += PendingFile.serializer.serializedSize(sh.file, version);
-            size += TypeSizes.NATIVE.sizeof(sh.pendingFiles.size());
-            for (PendingFile file : sh.pendingFiles)
-                size += PendingFile.serializer.serializedSize(file, version);
-            return size;
-       }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamIn.java b/src/java/org/apache/cassandra/streaming/StreamIn.java
deleted file mode 100644
index 588728c..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamIn.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.net.InetAddress;
-import java.util.Collection;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * for streaming data from other nodes in to this one.
- * Sends a STREAM_REQUEST Message to the source node(s), after which StreamOut on that side takes over.
- * See StreamOut for details.
- */
-public class StreamIn
-{
-    private static final Logger logger = LoggerFactory.getLogger(StreamIn.class);
-
-    /** Request ranges for all column families in the given keyspace. */
-    public static void requestRanges(InetAddress source, String tableName, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
-    {
-        requestRanges(source, tableName, Table.open(tableName).getColumnFamilyStores(), ranges, callback, type);
-    }
-
-    /**
-     * Request ranges to be transferred from specific CFs
-     */
-    public static void requestRanges(InetAddress source, String tableName, Collection<ColumnFamilyStore> columnFamilies, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
-    {
-        assert ranges.size() > 0;
-
-        if (logger.isDebugEnabled())
-            logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", "));
-        StreamInSession session = StreamInSession.create(source, callback);
-        StreamRequest srm = new StreamRequest(FBUtilities.getBroadcastAddress(),
-                                                            ranges,
-                                                            tableName,
-                                                            columnFamilies,
-                                                            session.getSessionId(),
-                                                            type);
-        MessagingService.instance().sendOneWay(srm.createMessage(), source);
-    }
-
-    /** Translates remote files to local files by creating a local sstable per remote sstable. */
-    public static PendingFile getContextMapping(PendingFile remote)
-    {
-        /* Create a local sstable for each remote sstable */
-        Descriptor remotedesc = remote.desc;
-        if (!remotedesc.isStreamCompatible())
-            throw new UnsupportedOperationException(String.format("SSTable %s is not compatible with current version %s",
-                                                                  remote.getFilename(), Descriptor.Version.CURRENT));
-
-        // new local sstable
-        Table table = Table.open(remotedesc.ksname);
-        ColumnFamilyStore cfStore = table.getColumnFamilyStore(remotedesc.cfname);
-        Directories.DataDirectory localDir = cfStore.directories.getLocationCapableOfSize(remote.size);
-        if (localDir == null)
-            throw new RuntimeException("Insufficient disk space to store " + remote.size + " bytes");
-        Descriptor localdesc = Descriptor.fromFilename(cfStore.getTempSSTablePath(cfStore.directories.getLocationForDisk(localDir)));
-
-        return new PendingFile(localdesc, remote);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
deleted file mode 100644
index 370183f..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.util.*;
-import java.util.concurrent.ConcurrentMap;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.UUIDGen;
-
-/** each context gets its own StreamInSession. So there may be >1 Session per host */
-public class StreamInSession extends AbstractStreamSession
-{
-    private static final Logger logger = LoggerFactory.getLogger(StreamInSession.class);
-
-    private static final ConcurrentMap<UUID, StreamInSession> sessions = new NonBlockingHashMap<UUID, StreamInSession>();
-
-    private final Set<PendingFile> files = new NonBlockingHashSet<PendingFile>();
-    private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
-    private PendingFile current;
-    private Socket socket;
-    private volatile int retries;
-
-    private StreamInSession(InetAddress host, UUID sessionId, IStreamCallback callback)
-    {
-        super(null, host, sessionId, callback);
-    }
-
-    public static StreamInSession create(InetAddress host, IStreamCallback callback)
-    {
-        StreamInSession session = new StreamInSession(host, UUIDGen.getTimeUUID(), callback);
-        sessions.put(session.getSessionId(), session);
-        return session;
-    }
-
-    public static StreamInSession get(InetAddress host, UUID sessionId)
-    {
-        StreamInSession session = sessions.get(sessionId);
-        if (session == null)
-        {
-            StreamInSession possibleNew = new StreamInSession(host, sessionId, null);
-            if ((session = sessions.putIfAbsent(sessionId, possibleNew)) == null)
-                session = possibleNew;
-        }
-        return session;
-    }
-
-    public static boolean hasSession(UUID sessionId)
-    {
-        return sessions.get(sessionId) != null;
-    }
-
-    public void setCurrentFile(PendingFile file)
-    {
-        this.current = file;
-    }
-
-    public void setTable(String table)
-    {
-        this.table = table;
-    }
-
-    public void setSocket(Socket socket)
-    {
-        this.socket = socket;
-    }
-
-    public void addFiles(Collection<PendingFile> files)
-    {
-        for (PendingFile file : files)
-        {
-            if(logger.isDebugEnabled())
-                logger.debug("Adding file {} to Stream Request queue", file.getFilename());
-            this.files.add(file);
-        }
-    }
-
-    public void finished(PendingFile remoteFile, SSTableReader reader) throws IOException
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("Finished {} (from {}). Sending ack to {}", new Object[] {remoteFile, getHost(), this});
-
-        assert reader != null;
-        readers.add(reader);
-        files.remove(remoteFile);
-        if (remoteFile.equals(current))
-            current = null;
-        StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_FINISHED);
-        // send a StreamStatus message telling the source node it can delete this file
-        sendMessage(reply.createMessage());
-        logger.debug("ack {} sent for {}", reply, remoteFile);
-    }
-
-    public void retry(PendingFile remoteFile)
-    {
-        retries++;
-        if (retries > DatabaseDescriptor.getMaxStreamingRetries())
-        {
-            logger.error(String.format("Failed streaming session %s from %s while receiving %s", getSessionId(), getHost().toString(), current),
-                         new IllegalStateException("Too many retries for " + remoteFile));
-            close(false);
-            return;
-        }
-        StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_RETRY);
-        logger.info("Streaming of file {} for {} failed: requesting a retry.", remoteFile, this);
-        try
-        {
-            sendMessage(reply.createMessage());
-        }
-        catch (IOException e)
-        {
-            logger.error("Sending retry message failed, closing session.", e);
-            close(false);
-        }
-    }
-
-    public void sendMessage(MessageOut<StreamReply> message) throws IOException
-    {
-        DataOutputStream out = new DataOutputStream(socket.getOutputStream());
-        FileStreamTask.sendReply(message,
-                                 out);
-        out.flush();
-    }
-
-    public void closeIfFinished() throws IOException
-    {
-        if (files.isEmpty())
-        {
-            HashMap <ColumnFamilyStore, List<SSTableReader>> cfstores = new HashMap<ColumnFamilyStore, List<SSTableReader>>();
-            try
-            {
-                for (SSTableReader sstable : readers)
-                {
-                    assert sstable.getTableName().equals(table);
-
-                    // Acquire the reference (for secondary index building) before submitting the index build,
-                    // so it can't get compacted out of existence in between
-                    if (!sstable.acquireReference())
-                        throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred");
-
-                    ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
-                    if (!cfstores.containsKey(cfs))
-                        cfstores.put(cfs, new ArrayList<SSTableReader>());
-                    cfstores.get(cfs).add(sstable);
-                }
-
-                // add sstables and build secondary indexes
-                for (Map.Entry<ColumnFamilyStore, List<SSTableReader>> entry : cfstores.entrySet())
-                {
-                    if (entry.getKey() != null)
-                    {
-                        entry.getKey().addSSTables(entry.getValue());
-                        entry.getKey().indexManager.maybeBuildSecondaryIndexes(entry.getValue(), entry.getKey().indexManager.allIndexesNames());
-                    }
-                }
-            }
-            finally
-            {
-                for (List<SSTableReader> referenced : cfstores.values())
-                    SSTableReader.releaseReferences(referenced);
-            }
-
-            // send reply to source that we're done
-            StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FINISHED);
-            logger.info("Finished streaming session {} from {}", getSessionId(), getHost());
-            try
-            {
-                if (socket != null)
-                    FileStreamTask.sendReply(reply.createMessage(),
-                                             new DataOutputStream(socket.getOutputStream()));
-                else
-                    logger.debug("No socket to reply to {} with!", getHost());
-            }
-            finally
-            {
-                if (socket != null)
-                    socket.close();
-            }
-
-            close(true);
-        }
-    }
-
-    protected void closeInternal(boolean success)
-    {
-        sessions.remove(sessionId);
-        if (!success && FailureDetector.instance.isAlive(getHost()))
-        {
-            StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FAILURE);
-            MessagingService.instance().sendOneWay(reply.createMessage(), getHost());
-        }
-    }
-
-    /** query method to determine which hosts are streaming to this node. */
-    public static Set<InetAddress> getSources()
-    {
-        HashSet<InetAddress> set = new HashSet<InetAddress>();
-        for (StreamInSession session : sessions.values())
-        {
-            set.add(session.getHost());
-        }
-        return set;
-    }
-
-    /** query the status of incoming files. */
-    public static Set<PendingFile> getIncomingFiles(InetAddress host)
-    {
-        Set<PendingFile> set = new HashSet<PendingFile>();
-        for (Map.Entry<UUID, StreamInSession> entry : sessions.entrySet())
-        {
-            StreamInSession session = entry.getValue();
-            if (session.getHost().equals(host))
-            {
-                if (session.current != null)
-                    set.add(session.current);
-                set.addAll(session.files);
-            }
-        }
-        return set;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
new file mode 100644
index 0000000..d64cc65
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+/**
+ * StreamManager manages currently running {@link StreamResultFuture}s and provides status of all operation invoked.
+ *
+ * All stream operation should be created through this class to track streaming status and progress.
+ */
+public class StreamManager implements StreamManagerMBean
+{
+    public static final StreamManager instance = new StreamManager();
+
+    private static final RateLimiter limiter = RateLimiter.create(Double.MAX_VALUE);
+
+    /**
+     * Gets streaming rate limiter.
+     * When stream_throughput_outbound_megabits_per_sec is 0, this returns rate limiter
+     * with the rate of Double.MAX_VALUE bytes per second.
+     * Rate unit is bytes per sec.
+     *
+     * @return RateLimiter with rate limit set
+     */
+    public static RateLimiter getRateLimiter()
+    {
+        double currentThroughput = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * 1024 * 1024 / 8 / 1000;
+        // if throughput is set to 0, throttling is disabled
+        if (currentThroughput == 0)
+            currentThroughput = Double.MAX_VALUE;
+        if (limiter.getRate() != currentThroughput)
+            limiter.setRate(currentThroughput);
+        return limiter;
+    }
+
+    /** Currently running stream plans. Removed after completion/failure. */
+    private final Map<UUID, StreamResultFuture> currentStreams = new NonBlockingHashMap<>();
+
+    public Set<StreamState> getCurrentStreams()
+    {
+        return Sets.newHashSet(Iterables.transform(currentStreams.values(), new Function<StreamResultFuture, StreamState>()
+        {
+            public StreamState apply(StreamResultFuture input)
+            {
+                return input.getCurrentState();
+            }
+        }));
+    }
+
+    public void register(final StreamResultFuture result)
+    {
+        // Make sure we remove the stream on completion (whether successful or not)
+        result.addListener(new Runnable()
+        {
+            public void run()
+            {
+                currentStreams.remove(result.planId);
+            }
+        }, MoreExecutors.sameThreadExecutor());
+
+        currentStreams.put(result.planId, result);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
new file mode 100644
index 0000000..eb6f6ae
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+public interface StreamManagerMBean
+{
+    public static final String OBJECT_NAME = "org.apache.cassandra.net:type=StreamManager";
+
+    /**
+     * Returns the current state of all ongoing streams.
+     */
+    Set<StreamState> getCurrentStreams();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java
deleted file mode 100644
index 709741b..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamOut.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.Future;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.streaming.compress.CompressionInfo;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * This class handles streaming data from one node to another.
- *
- * The source node [the Out side] is always in charge of the streaming session.  Streams may
- * be initiated either directly by the source via the methods in this class,
- * or on demand from the target (via StreamRequest).
- *
- * Files to stream are grouped into sessions, which can have callbacks associated
- * with them so that (for instance) we can mark a new node a full member of the
- * cluster after all the data it needs has been streamed.
- *
- * The source begins a session by sending
- * a Message with the stream bit flag in the Header turned on.  Part of that Message
- * will include a StreamHeader that includes the files that will be streamed as part
- * of that session, as well as the first file-to-be-streamed. (Combining session list
- * and first file like this is inconvenient, but not as inconvenient as the old
- * three-part send-file-list, wait-for-ack, start-first-file dance.)
- *
- * This is done over a separate TCP connection to avoid blocking ordinary intra-node
- * traffic during the stream.  So there is no Handler for the main stream of data --
- * when a connection sets the Stream bit, IncomingTcpConnection knows what to expect
- * without any further Messages.
- *
- * After each file, the target node [the In side] will send a StreamReply indicating success
- * (FILE_FINISHED) or failure (FILE_RETRY).
- *
- * When all files have been successfully transferred and integrated the target will
- * send an additional SESSION_FINISHED reply and the session is complete.
- *
- * For Stream requests (for bootstrap), one subtlety is that we always have to
- * create at least one stream reply, even if the list of files is empty, otherwise the
- * target has no way to know that it can stop waiting for an answer.
- *
- */
-public class StreamOut
-{
-    private static final Logger logger = LoggerFactory.getLogger(StreamOut.class);
-
-    /**
-     * Stream the given ranges to the target endpoint from each CF in the given keyspace.
-    */
-    public static void transferRanges(InetAddress target, Table table, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
-    {
-        transferRanges(target, table, table.getColumnFamilyStores(), ranges, callback, type);
-    }
-
-    /**
-     * Stream the given ranges to the target endpoint for provided CFs in the given keyspace.
-     */
-    public static void transferRanges(InetAddress target, Table table, Iterable<ColumnFamilyStore> cfses, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
-    {
-        StreamOutSession session = StreamOutSession.create(table.getName(), target, callback);
-        transferRanges(session, cfses, ranges, type);
-    }
-
-    /**
-     * Flushes matching column families from the given keyspace, or all columnFamilies
-     * if the cf list is empty.
-     */
-    private static void flushSSTables(Iterable<ColumnFamilyStore> stores)
-    {
-        logger.info("Flushing memtables for {}...", stores);
-        List<Future<?>> flushes = new ArrayList<Future<?>>();
-        for (ColumnFamilyStore cfstore : stores)
-            flushes.add(cfstore.forceFlush());
-        FBUtilities.waitOnFutures(flushes);
-    }
-
-    /**
-     * Stream the given ranges to the target endpoint from each of the given CFs.
-    */
-    public static void transferRanges(StreamOutSession session, Iterable<ColumnFamilyStore> cfses, Collection<Range<Token>> ranges, OperationType type)
-    {
-        transferRanges(session, cfses, ranges, type, true);
-    }
-
-    /**
-     * Stream the given ranges to the target endpoint from each of the given CFs.
-    */
-    public static void transferRanges(StreamOutSession session,
-                                      Iterable<ColumnFamilyStore> cfses,
-                                      Collection<Range<Token>> ranges,
-                                      OperationType type,
-                                      boolean flushTables)
-    {
-        assert ranges.size() > 0;
-        logger.info("Beginning transfer to {}", session.getHost());
-        logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
-
-        if (flushTables)
-            flushSSTables(cfses);
-
-        List<SSTableReader> sstables = Lists.newLinkedList();
-        for (ColumnFamilyStore cfStore : cfses)
-        {
-            List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
-            for (Range<Token> range : ranges)
-                rowBoundsList.add(range.toRowBounds());
-            ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
-            sstables.addAll(view.sstables);
-        }
-
-        transferSSTables(session, sstables, ranges, type);
-    }
-
-    /**
-     * Low-level transfer of matching portions of a group of sstables from a single table to the target endpoint.
-     * You should probably call transferRanges instead. This moreover assumes that references have been acquired on the sstables.
-     */
-    public static void transferSSTables(StreamOutSession session, Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, OperationType type)
-    {
-        List<PendingFile> pending = createPendingFiles(sstables, ranges, type);
-
-        // Even if the list of pending files is empty, we need to initiate the transfer otherwise
-        // the remote end will hang in cases where this was a requested transfer.
-        session.addFilesToStream(pending);
-        session.begin();
-    }
-
-    // called prior to sending anything.
-    private static List<PendingFile> createPendingFiles(Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, OperationType type)
-    {
-        List<PendingFile> pending = new ArrayList<PendingFile>();
-        for (SSTableReader sstable : sstables)
-        {
-            Descriptor desc = sstable.descriptor;
-            List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
-            if (sections.isEmpty())
-            {
-                // A reference was acquired on the sstable and we won't stream it
-                sstable.releaseReference();
-                continue;
-            }
-            CompressionInfo compression = null;
-            if (sstable.compression)
-            {
-                compression = new CompressionInfo(sstable.getCompressionMetadata().getChunksForSections(sections),
-                                                  sstable.getCompressionMetadata().parameters);
-            }
-            pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type, sstable.estimatedKeysForRanges(ranges), compression));
-        }
-        logger.info("Stream context metadata {}, {} sstables.", pending, Iterables.size(sstables));
-        return pending;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamOutSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOutSession.java b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
deleted file mode 100644
index edc07ca..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamOutSession.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.UUIDGen;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
-/**
- * This class manages the streaming of multiple files one after the other.
- */
-public class StreamOutSession extends AbstractStreamSession
-{
-    private static final Logger logger = LoggerFactory.getLogger(StreamOutSession.class);
-
-    // one host may have multiple stream sessions.
-    private static final ConcurrentMap<UUID, StreamOutSession> streams = new NonBlockingHashMap<UUID, StreamOutSession>();
-
-    public static StreamOutSession create(String table, InetAddress host, IStreamCallback callback)
-    {
-        return create(table, host, UUIDGen.getTimeUUID(), callback);
-    }
-
-    public static StreamOutSession create(String table, InetAddress host, UUID sessionId)
-    {
-        return create(table, host, sessionId, null);
-    }
-
-    public static StreamOutSession create(String table, InetAddress host, UUID sessionId, IStreamCallback callback)
-    {
-        StreamOutSession session = new StreamOutSession(table, host, sessionId, callback);
-        streams.put(sessionId, session);
-        return session;
-    }
-
-    public static StreamOutSession get(UUID sessionId)
-    {
-        return streams.get(sessionId);
-    }
-
-    private final Map<String, PendingFile> files = new NonBlockingHashMap<String, PendingFile>();
-
-    private volatile String currentFile;
-
-    private StreamOutSession(String table, InetAddress host, UUID sessionId, IStreamCallback callback)
-    {
-        super(table, host, sessionId, callback);
-    }
-
-    public void addFilesToStream(List<PendingFile> pendingFiles)
-    {
-        for (PendingFile pendingFile : pendingFiles)
-        {
-            if (logger.isDebugEnabled())
-                logger.debug("Adding file {} to be streamed.", pendingFile.getFilename());
-            files.put(pendingFile.getFilename(), pendingFile);
-        }
-    }
-
-    public void retry()
-    {
-        streamFile(files.get(currentFile));
-    }
-
-    private void streamFile(PendingFile pf)
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("Streaming {} ...", pf);
-        currentFile = pf.getFilename();
-        MessagingService.instance().stream(new StreamHeader(table, getSessionId(), pf), getHost());
-    }
-
-    public void startNext()
-    {
-        assert files.containsKey(currentFile);
-        files.get(currentFile).sstable.releaseReference();
-        files.remove(currentFile);
-        Iterator<PendingFile> iter = files.values().iterator();
-        if (iter.hasNext())
-            streamFile(iter.next());
-    }
-
-    protected void closeInternal(boolean success)
-    {
-        // Release reference on last file (or any uncompleted ones)
-        for (PendingFile file : files.values())
-            file.sstable.releaseReference();
-        streams.remove(sessionId);
-    }
-
-    /** convenience method for use when testing */
-    void await() throws InterruptedException
-    {
-        while (streams.containsKey(sessionId))
-            Thread.sleep(10);
-    }
-
-    public Collection<PendingFile> getFiles()
-    {
-        return files.values();
-    }
-
-    public static Set<InetAddress> getDestinations()
-    {
-        Set<InetAddress> hosts = new HashSet<InetAddress>();
-        for (StreamOutSession session : streams.values())
-        {
-            hosts.add(session.getHost());
-        }
-        return hosts;
-    }
-
-    public static List<PendingFile> getOutgoingFiles(InetAddress host)
-    {
-        List<PendingFile> list = new ArrayList<PendingFile>();
-        for (Map.Entry<UUID, StreamOutSession> entry : streams.entrySet())
-        {
-            StreamOutSession session = entry.getValue();
-            if (session.getHost().equals(host))
-                list.addAll(session.getFiles());
-        }
-        return list;
-    }
-
-    public void validateCurrentFile(String file)
-    {
-        if (!file.equals(currentFile))
-            throw new IllegalStateException(String.format("target reports current file is %s but is %s", file, currentFile));
-    }
-
-    public void begin()
-    {
-        PendingFile first = files.isEmpty() ? null : files.values().iterator().next();
-        currentFile = first == null ? null : first.getFilename();
-        StreamHeader header = new StreamHeader(table, getSessionId(), first, files.values());
-        logger.info("Streaming to {}", getHost());
-        logger.debug("Files are {}", StringUtils.join(files.values(), ","));
-        MessagingService.instance().stream(header, getHost());
-    }
-}