You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/20 19:07:15 UTC

[3/5] Streaming 2.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
new file mode 100644
index 0000000..78d50ad
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.*;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * {@link StreamPlan} is a helper class that builds StreamOperation of given configuration.
+ *
+ * This is the class you want to use for building streaming plan and starting streaming.
+ */
+public class StreamPlan
+{
+    private final UUID planId = UUIDGen.getTimeUUID();
+    private final String description;
+
+    // sessions per InetAddress of the other end.
+    private final Map<InetAddress, StreamSession> sessions = new HashMap<>();
+
+    private boolean flushBeforeTransfer = true;
+
+    /**
+     * Start building stream plan.
+     *
+     * @param description Stream type that describes this StreamPlan
+     */
+    public StreamPlan(String description)
+    {
+        this.description = description;
+    }
+
+    /**
+     * Request data in {@code keyspace} and {@code ranges} from specific node.
+     *
+     * @param from endpoint address to fetch data from.
+     * @param keyspace name of keyspace
+     * @param ranges ranges to fetch
+     * @return this object for chaining
+     */
+    public StreamPlan requestRanges(InetAddress from, String keyspace, Collection<Range<Token>> ranges)
+    {
+        return requestRanges(from, keyspace, ranges, new String[0]);
+    }
+
+    /**
+     * Request data in {@code columnFamilies} under {@code keyspace} and {@code ranges} from specific node.
+     *
+     * @param from endpoint address to fetch data from.
+     * @param keyspace name of keyspace
+     * @param ranges ranges to fetch
+     * @param columnFamilies specific column families
+     * @return this object for chaining
+     */
+    public StreamPlan requestRanges(InetAddress from, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
+    {
+        StreamSession session = getOrCreateSession(from);
+        session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies));
+        return this;
+    }
+
+    /**
+     * Add transfer task to send data of specific keyspace and ranges.
+     *
+     * @param to endpoint address of receiver
+     * @param keyspace name of keyspace
+     * @param ranges ranges to send
+     * @return this object for chaining
+     */
+    public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges)
+    {
+        return transferRanges(to, keyspace, ranges, new String[0]);
+    }
+
+    /**
+     * Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}.
+     *
+     * @param to endpoint address of receiver
+     * @param keyspace name of keyspace
+     * @param ranges ranges to send
+     * @param columnFamilies specific column families
+     * @return this object for chaining
+     */
+    public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
+    {
+        StreamSession session = getOrCreateSession(to);
+        session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer);
+        return this;
+    }
+
+    /**
+     * Add transfer task to send given SSTable files.
+     *
+     * @param to endpoint address of receiver
+     * @param ranges ranges to send
+     * @param sstables files to send
+     * @return this object for chaining
+     */
+    public StreamPlan transferFiles(InetAddress to, Collection<Range<Token>> ranges, Collection<SSTableReader> sstables)
+    {
+        StreamSession session = getOrCreateSession(to);
+        session.addTransferFiles(ranges, sstables);
+        return this;
+    }
+
+    /**
+     * @return true if this plan has no plan to execute
+     */
+    public boolean isEmpty()
+    {
+        return sessions.isEmpty();
+    }
+
+    /**
+     * Execute this {@link StreamPlan} asynchronously.
+     *
+     * @return Future {@link StreamState} that you can use to listen on progress of streaming.
+     */
+    public StreamResultFuture execute()
+    {
+        return StreamResultFuture.startStreamingAsync(planId, description, sessions.values());
+    }
+
+    /**
+     * Set flushBeforeTransfer option.
+     * When it's true, will flush before streaming ranges. (Default: true)
+     *
+     * @param flushBeforeTransfer set to true when the node should flush before transfer
+     * @return this object for chaining
+     */
+    public StreamPlan flushBeforeTransfer(boolean flushBeforeTransfer)
+    {
+        this.flushBeforeTransfer = flushBeforeTransfer;
+        return this;
+    }
+
+    private StreamSession getOrCreateSession(InetAddress peer)
+    {
+        StreamSession session = sessions.get(peer);
+        if (session == null)
+        {
+            session = new StreamSession(peer);
+            sessions.put(peer, session);
+        }
+        return session;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
new file mode 100644
index 0000000..a01de3c
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collection;
+import java.util.UUID;
+
+import com.google.common.base.Throwables;
+import com.ning.compress.lzf.LZFInputStream;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.BytesReadTracker;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * StreamReader reads from stream and writes to SSTable.
+ */
+public class StreamReader
+{
+    protected final UUID cfId;
+    protected final long estimatedKeys;
+    protected final Collection<Pair<Long, Long>> sections;
+    protected final StreamSession session;
+
+    protected Descriptor desc;
+
+    public StreamReader(FileMessageHeader header, StreamSession session)
+    {
+        this.session = session;
+        this.cfId = header.cfId;
+        this.estimatedKeys = header.estimatedKeys;
+        this.sections = header.sections;
+    }
+
+    /**
+     * @param channel where this reads data from
+     * @return SSTable transferred
+     * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
+     */
+    public SSTableReader read(ReadableByteChannel channel) throws IOException
+    {
+        long totalSize = totalSize();
+
+        Pair<String, String> kscf = Schema.instance.getCF(cfId);
+        ColumnFamilyStore cfs = Table.open(kscf.left).getColumnFamilyStore(kscf.right);
+        Directories.DataDirectory localDir = cfs.directories.getLocationCapableOfSize(totalSize);
+        if (localDir == null)
+            throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
+        desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
+
+        SSTableWriter writer = new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys);
+        try
+        {
+            DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
+            BytesReadTracker in = new BytesReadTracker(dis);
+
+            while (in.getBytesRead() < totalSize)
+            {
+                writeRow(writer, in, cfs);
+                // TODO move this to BytesReadTracker
+                session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
+            }
+            return writer.closeAndOpenReader();
+        }
+        catch (Throwable e)
+        {
+            writer.abort();
+            if (e instanceof IOException)
+                throw (IOException) e;
+            else
+                throw Throwables.propagate(e);
+        }
+    }
+
+    protected long totalSize()
+    {
+        long size = 0;
+        for (Pair<Long, Long> section : sections)
+            size += section.right - section.left;
+        return size;
+    }
+
+    protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
+    {
+        DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+        writer.appendFromStream(key, cfs.metadata, in);
+        cfs.invalidateCachedRow(key);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
new file mode 100644
index 0000000..af8c138
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.UUID;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Task that manages receiving files for the session for certain ColumnFamily.
+ */
+public class StreamReceiveTask extends StreamTask
+{
+    // number of files to receive
+    private final int totalFiles;
+    // total size of files to receive
+    private final long totalSize;
+
+    //  holds references to SSTables received
+    protected Collection<SSTableReader> sstables;
+
+    public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
+    {
+        super(session, cfId);
+        this.totalFiles = totalFiles;
+        this.totalSize = totalSize;
+        this.sstables =  new ArrayList<>(totalFiles);
+    }
+
+    /**
+     * Process received file.
+     *
+     * @param sstable SSTable file received.
+     */
+    public void receive(SSTableReader sstable)
+    {
+        assert cfId.equals(sstable.metadata.cfId);
+
+        sstables.add(sstable);
+        if (sstables.size() == totalFiles)
+            complete();
+    }
+
+    public int getTotalNumberOfFiles()
+    {
+        return totalFiles;
+    }
+
+    public long getTotalSize()
+    {
+        return totalSize;
+    }
+
+    // TODO should be run in background so that this does not block streaming
+    private void complete()
+    {
+        if (!SSTableReader.acquireReferences(sstables))
+            throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred");
+        try
+        {
+            Pair<String, String> kscf = Schema.instance.getCF(cfId);
+            ColumnFamilyStore cfs = Table.open(kscf.left).getColumnFamilyStore(kscf.right);
+            // add sstables and build secondary indexes
+            cfs.addSSTables(sstables);
+            cfs.indexManager.maybeBuildSecondaryIndexes(sstables, cfs.indexManager.allIndexesNames());
+        }
+        finally
+        {
+            SSTableReader.releaseReferences(sstables);
+        }
+
+        session.taskCompleted(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReply.java b/src/java/org/apache/cassandra/streaming/StreamReply.java
deleted file mode 100644
index d54e9cc..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamReply.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.UUIDSerializer;
-
-public class StreamReply
-{
-    static enum Status
-    {
-        FILE_FINISHED,
-        FILE_RETRY,
-        SESSION_FINISHED,
-        SESSION_FAILURE,
-    }
-
-    public static final IVersionedSerializer<StreamReply> serializer = new FileStatusSerializer();
-
-    public final UUID sessionId;
-    public final String file;
-    public final Status action;
-
-    public StreamReply(String file, UUID sessionId, Status action)
-    {
-        this.file = file;
-        this.action = action;
-        this.sessionId = sessionId;
-    }
-
-    public MessageOut<StreamReply> createMessage()
-    {
-        return new MessageOut<StreamReply>(MessagingService.Verb.STREAM_REPLY, this, serializer);
-    }
-
-    @Override
-    public String toString()
-    {
-        return "StreamReply(" +
-               "sessionId=" + sessionId +
-               ", file='" + file + '\'' +
-               ", action=" + action +
-               ')';
-    }
-
-    private static class FileStatusSerializer implements IVersionedSerializer<StreamReply>
-    {
-        public void serialize(StreamReply reply, DataOutput out, int version) throws IOException
-        {
-            UUIDSerializer.serializer.serialize(reply.sessionId, out, MessagingService.current_version);
-            out.writeUTF(reply.file);
-            out.writeInt(reply.action.ordinal());
-        }
-
-        public StreamReply deserialize(DataInput in, int version) throws IOException
-        {
-            UUID sessionId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
-            String targetFile = in.readUTF();
-            Status action = Status.values()[in.readInt()];
-            return new StreamReply(targetFile, sessionId, action);
-        }
-
-        public long serializedSize(StreamReply reply, int version)
-        {
-            return TypeSizes.NATIVE.sizeof(reply.sessionId) + TypeSizes.NATIVE.sizeof(reply.file) + TypeSizes.NATIVE.sizeof(reply.action.ordinal());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
deleted file mode 100644
index b69b6d0..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.MessageIn;
-
-public class StreamReplyVerbHandler implements IVerbHandler<StreamReply>
-{
-    private static final Logger logger = LoggerFactory.getLogger(StreamReplyVerbHandler.class);
-
-    public void doVerb(MessageIn<StreamReply> message, int id)
-    {
-        StreamReply reply = message.payload;
-        logger.debug("Received StreamReply {}", reply);
-        StreamOutSession session = StreamOutSession.get(reply.sessionId);
-        if (session == null)
-        {
-            logger.debug("Received stream action " + reply.action + " for an unknown session from " + message.from);
-            return;
-        }
-
-        switch (reply.action)
-        {
-            case FILE_FINISHED:
-                logger.info("Successfully sent {} to {}", reply.file, message.from);
-                session.validateCurrentFile(reply.file);
-                session.startNext();
-                break;
-            case FILE_RETRY:
-                session.validateCurrentFile(reply.file);
-                logger.info("Need to re-stream file {} to {}", reply.file, message.from);
-                session.retry();
-                break;
-            case SESSION_FINISHED:
-                session.close(true);
-                break;
-            case SESSION_FAILURE:
-                session.close(false);
-                break;
-            default:
-                throw new RuntimeException("Cannot handle FileStatus.Action: " + reply.action);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java
index 465d8bf..9d3fdb2 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequest.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java
@@ -20,173 +20,77 @@ package org.apache.cassandra.streaming;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
-import java.util.UUID;
 
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.CompactEndpointSerializationHelper;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.UUIDSerializer;
 
-/**
-* This class encapsulates the message that needs to be sent to nodes
-* that handoff data. The message contains information about ranges
-* that need to be transferred and the target node.
-*
-* If a file is specified, ranges and table will not. vice-versa should hold as well.
-*/
 public class StreamRequest
 {
     public static final IVersionedSerializer<StreamRequest> serializer = new StreamRequestSerializer();
 
-    protected final UUID sessionId;
-    protected final InetAddress target;
-
-    // if this is specified, ranges and table should not be.
-    protected final PendingFile file;
+    public final String keyspace;
+    public final Collection<Range<Token>> ranges;
+    public final Collection<String> columnFamilies = new HashSet<>();
 
-    // if these are specified, file should not be.
-    protected final Collection<Range<Token>> ranges;
-    protected final String table;
-    protected final Iterable<ColumnFamilyStore> columnFamilies;
-    protected final OperationType type;
-
-    StreamRequest(InetAddress target, Collection<Range<Token>> ranges, String table, Iterable<ColumnFamilyStore> columnFamilies, UUID sessionId, OperationType type)
+    public StreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies)
     {
-        this.target = target;
+        this.keyspace = keyspace;
         this.ranges = ranges;
-        this.table = table;
-        this.columnFamilies = columnFamilies;
-        this.sessionId = sessionId;
-        this.type = type;
-        file = null;
-    }
-
-    StreamRequest(InetAddress target, PendingFile file, UUID sessionId)
-    {
-        this.target = target;
-        this.file = file;
-        this.sessionId = sessionId;
-        this.type = file.type;
-        ranges = null;
-        table = null;
-        columnFamilies = null;
-    }
-
-    public MessageOut<StreamRequest> createMessage()
-    {
-        return new MessageOut<StreamRequest>(MessagingService.Verb.STREAM_REQUEST, this, serializer);
+        this.columnFamilies.addAll(columnFamilies);
     }
 
-    public String toString()
+    public static class StreamRequestSerializer implements IVersionedSerializer<StreamRequest>
     {
-        StringBuilder sb = new StringBuilder("");
-        if (file == null)
+        public void serialize(StreamRequest request, DataOutput out, int version) throws IOException
         {
-            sb.append(table);
-            sb.append("@");
-            sb.append(columnFamilies.toString());
-            sb.append("@");
-            sb.append(target);
-            sb.append("------->");
-            for (Range<Token> range : ranges)
+            out.writeUTF(request.keyspace);
+            out.writeInt(request.ranges.size());
+            for (Range<Token> range : request.ranges)
             {
-                sb.append(range);
-                sb.append(" ");
+                Token.serializer.serialize(range.left, out);
+                Token.serializer.serialize(range.right, out);
             }
-            sb.append(type);
-        }
-        else
-        {
-            sb.append(file.toString());
+            out.writeInt(request.columnFamilies.size());
+            for (String cf : request.columnFamilies)
+                out.writeUTF(cf);
         }
-        return sb.toString();
-    }
 
-    private static class StreamRequestSerializer implements IVersionedSerializer<StreamRequest>
-    {
-        public void serialize(StreamRequest srm, DataOutput out, int version) throws IOException
+        public StreamRequest deserialize(DataInput in, int version) throws IOException
         {
-            UUIDSerializer.serializer.serialize(srm.sessionId, out, MessagingService.current_version);
-            CompactEndpointSerializationHelper.serialize(srm.target, out);
-            if (srm.file != null)
+            String keyspace = in.readUTF();
+            int rangeCount = in.readInt();
+            List<Range<Token>> ranges = new ArrayList<>(rangeCount);
+            for (int i = 0; i < rangeCount; i++)
             {
-                out.writeBoolean(true);
-                PendingFile.serializer.serialize(srm.file, out, version);
-            }
-            else
-            {
-                out.writeBoolean(false);
-                out.writeUTF(srm.table);
-                out.writeInt(srm.ranges.size());
-                for (Range<Token> range : srm.ranges)
-                    AbstractBounds.serializer.serialize(range, out, version);
-
-                out.writeUTF(srm.type.name());
-
-                out.writeInt(Iterables.size(srm.columnFamilies));
-                for (ColumnFamilyStore cfs : srm.columnFamilies)
-                    ColumnFamily.serializer.serializeCfId(cfs.metadata.cfId, out, version);
+                Token left = Token.serializer.deserialize(in);
+                Token right = Token.serializer.deserialize(in);
+                ranges.add(new Range<>(left, right));
             }
+            int cfCount = in.readInt();
+            List<String> columnFamilies = new ArrayList<>(cfCount);
+            for (int i = 0; i < cfCount; i++)
+                columnFamilies.add(in.readUTF());
+            return new StreamRequest(keyspace, ranges, columnFamilies);
         }
 
-        public StreamRequest deserialize(DataInput in, int version) throws IOException
+        public long serializedSize(StreamRequest request, int version)
         {
-            UUID sessionId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
-            InetAddress target = CompactEndpointSerializationHelper.deserialize(in);
-            boolean singleFile = in.readBoolean();
-            if (singleFile)
+            int size = TypeSizes.NATIVE.sizeof(request.keyspace);
+            size += TypeSizes.NATIVE.sizeof(request.ranges.size());
+            for (Range<Token> range : request.ranges)
             {
-                PendingFile file = PendingFile.serializer.deserialize(in, version);
-                return new StreamRequest(target, file, sessionId);
+                size += Token.serializer.serializedSize(range.left, TypeSizes.NATIVE);
+                size += Token.serializer.serializedSize(range.right, TypeSizes.NATIVE);
             }
-            else
-            {
-                String table = in.readUTF();
-                int size = in.readInt();
-                List<Range<Token>> ranges = (size == 0) ? null : new ArrayList<Range<Token>>(size);
-                for (int i = 0; i < size; ++i)
-                    ranges.add((Range<Token>) AbstractBounds.serializer.deserialize(in, version).toTokenBounds());
-                OperationType type = OperationType.valueOf(in.readUTF());
-
-                List<ColumnFamilyStore> stores = new ArrayList<ColumnFamilyStore>();
-                int cfsSize = in.readInt();
-                for (int i = 0; i < cfsSize; ++i)
-                    stores.add(Table.open(table).getColumnFamilyStore(ColumnFamily.serializer.deserializeCfId(in, version)));
-
-                return new StreamRequest(target, ranges, table, stores, sessionId, type);
-            }
-        }
-
-        public long serializedSize(StreamRequest sr, int version)
-        {
-            long size = TypeSizes.NATIVE.sizeof(sr.sessionId);
-            size += CompactEndpointSerializationHelper.serializedSize(sr.target);
-            size += TypeSizes.NATIVE.sizeof(true);
-            if (sr.file != null)
-                return size + PendingFile.serializer.serializedSize(sr.file, version);
-
-            size += TypeSizes.NATIVE.sizeof(sr.table);
-            size += TypeSizes.NATIVE.sizeof(sr.ranges.size());
-            for (Range<Token> range : sr.ranges)
-                size += AbstractBounds.serializer.serializedSize(range, version);
-            size += TypeSizes.NATIVE.sizeof(sr.type.name());
-            size += TypeSizes.NATIVE.sizeof(Iterables.size(sr.columnFamilies));
-            for (ColumnFamilyStore cfs : sr.columnFamilies)
-                size += ColumnFamily.serializer.cfIdSerializedSize(cfs.metadata.cfId, TypeSizes.NATIVE, version);
+            size += TypeSizes.NATIVE.sizeof(request.columnFamilies.size());
+            for (String cf : request.columnFamilies)
+                size += TypeSizes.NATIVE.sizeof(cf);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
deleted file mode 100644
index bbed34d..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.cassandra.streaming;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.MessageIn;
-
-/**
- * This verb handler handles the StreamRequestMessage that is sent by
- * the node requesting range transfer.
-*/
-public class StreamRequestVerbHandler implements IVerbHandler<StreamRequest>
-{
-    private static final Logger logger = LoggerFactory.getLogger(StreamRequestVerbHandler.class);
-
-    public void doVerb(MessageIn<StreamRequest> message, int id)
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("Received a StreamRequestMessage from {}", message.from);
-
-        StreamRequest srm = message.payload;
-        if (logger.isDebugEnabled())
-            logger.debug(srm.toString());
-
-        StreamOutSession session = StreamOutSession.create(srm.table, message.from, srm.sessionId);
-        StreamOut.transferRanges(session, srm.columnFamilies, srm.ranges, srm.type);
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
new file mode 100644
index 0000000..84332bd
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * StreamResultFuture asynchronously returns the final {@link StreamState} of execution of {@link StreamPlan}.
+ * <p>
+ * You can attach {@link StreamEventHandler} to this object to listen on {@link StreamEvent}s to track progress of the streaming.
+ */
+public final class StreamResultFuture extends AbstractFuture<StreamState>
+{
+    // Executor that establish the streaming connection. Once we're connected to the other end, the rest of the streaming
+    // is directly handled by the ConnectionHandler incoming and outgoing threads.
+    private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
+                                                                                                                            FBUtilities.getAvailableProcessors());
+
+    public final UUID planId;
+    public final String description;
+    private final List<StreamEventHandler> eventListeners = Collections.synchronizedList(new ArrayList<StreamEventHandler>());
+    private final Set<UUID> ongoingSessions;
+    private final Map<InetAddress, SessionInfo> sessionStates = new NonBlockingHashMap<>();
+
+    /**
+     * Create new StreamResult of given {@code planId} and type.
+     *
+     * Constructor is package private. You need to use {@link StreamPlan#execute()} to get the instance.
+     *
+     * @param planId Stream plan ID
+     * @param description Stream description
+     * @param numberOfSessions number of sessions to wait for complete
+     */
+    private StreamResultFuture(UUID planId, String description, Set<UUID> sessions)
+    {
+        this.planId = planId;
+        this.description = description;
+        this.ongoingSessions = sessions;
+
+        // if there is no session to listen to, we immediately set result for returning
+        if (sessions.isEmpty())
+            set(getCurrentState());
+    }
+
+    static StreamResultFuture startStreamingAsync(UUID planId, String description, Collection<StreamSession> sessions)
+    {
+        Set<UUID> sessionsIds = new HashSet<>(sessions.size());
+        for (StreamSession session : sessions)
+            sessionsIds.add(session.id);
+
+        StreamResultFuture future = new StreamResultFuture(planId, description, sessionsIds);
+
+        StreamManager.instance.register(future);
+
+        // start sessions
+        for (StreamSession session : sessions)
+        {
+            session.register(future);
+            // register to gossiper/FD to fail on node failure
+            Gossiper.instance.register(session);
+            FailureDetector.instance.registerFailureDetectionEventListener(session);
+            streamExecutor.submit(session);
+        }
+        return future;
+    }
+
+    public void addEventListener(StreamEventHandler listener)
+    {
+        Futures.addCallback(this, listener);
+        eventListeners.add(listener);
+    }
+
+    /**
+     * @return Current snapshot of streaming progress.
+     */
+    public StreamState getCurrentState()
+    {
+        return new StreamState(planId, description, ImmutableSet.copyOf(sessionStates.values()));
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        StreamResultFuture that = (StreamResultFuture) o;
+        return planId.equals(that.planId);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return planId.hashCode();
+    }
+
+    void handleSessionPrepared(StreamSession session)
+    {
+        SessionInfo sessionInfo = session.getSessionInfo();
+        StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(planId, sessionInfo);
+        sessionStates.put(sessionInfo.peer, sessionInfo);
+        fireStreamEvent(event);
+    }
+
+    void handleSessionComplete(StreamSession session)
+    {
+        Gossiper.instance.unregister(session);
+        FailureDetector.instance.unregisterFailureDetectionEventListener(session);
+
+        SessionInfo sessionInfo = session.getSessionInfo();
+        sessionStates.put(sessionInfo.peer, sessionInfo);
+        fireStreamEvent(new StreamEvent.SessionCompleteEvent(session));
+        maybeComplete(session.id);
+    }
+
+    public void handleProgress(ProgressInfo progress)
+    {
+        sessionStates.get(progress.peer).updateProgress(progress);
+        fireStreamEvent(new StreamEvent.ProgressEvent(planId, progress));
+    }
+
+    void fireStreamEvent(StreamEvent event)
+    {
+        // delegate to listener
+        for (StreamEventHandler listener : eventListeners)
+            listener.handleStreamEvent(event);
+    }
+
+    private synchronized void maybeComplete(UUID sessionId)
+    {
+        ongoingSessions.remove(sessionId);
+        if (ongoingSessions.isEmpty())
+        {
+            StreamState finalState = getCurrentState();
+            if (finalState.hasFailedSession())
+                setException(new StreamException(finalState, "Stream failed"));
+            else
+                set(finalState);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
new file mode 100644
index 0000000..7d96f43
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.*;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.*;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.metrics.StreamingMetrics;
+import org.apache.cassandra.streaming.messages.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * StreamSession is the center of Cassandra Streaming API.
+ *
+ * StreamSession on the both endpoints exchange messages and files until complete.
+ *
+ * It is created through {@link StreamPlan} on the initiator node,
+ * and also is created directly from connected socket on the other end when received init message.
+ *
+ * <p>
+ * StreamSession goes through several stages:
+ * <ol>
+ *  <li>
+ *    Init
+ *    <p>StreamSession in one end send init message to the other end.</p>
+ *  </li>
+ *  <li>
+ *    Prepare
+ *    <p>StreamSession in both endpoints are created, so in this phase, they exchange
+ *    request and summary messages to prepare receiving/streaming files in next phase.</p>
+ *  </li>
+ *  <li>
+ *    Stream
+ *    <p>StreamSessions in both ends stream and receive files.</p>
+ *  </li>
+ *  <li>
+ *    Complete
+ *    <p>Session completes if both endpoints completed by exchanging complete message.</p>
+ *  </li>
+ * </ol>
+ */
+public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+{
+    private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
+
+    public final UUID id = UUIDGen.getTimeUUID();
+    public final InetAddress peer;
+
+    // should not be null when session is started
+    private StreamResultFuture streamResult;
+
+    // stream requests to send to the peer
+    private final List<StreamRequest> requests = new ArrayList<>();
+    // streaming tasks are created and managed per ColumnFamily ID
+    private final Map<UUID, StreamTransferTask> transfers = new HashMap<>();
+    // data receivers, filled after receiving prepare message
+    private final Map<UUID, StreamReceiveTask> receivers = new HashMap<>();
+    private final StreamingMetrics metrics;
+
+    public final ConnectionHandler handler;
+
+    private int retries;
+
+    public static enum State
+    {
+        INITIALIZING,
+        PREPARING,
+        STREAMING,
+        WAIT_COMPLETE,
+        COMPLETE,
+        FAILED,
+    }
+
+    private volatile State state = State.INITIALIZING;
+
+    /**
+     * Create new streaming session with the peer.
+     *
+     * @param peer Address of streaming peer
+     */
+    public StreamSession(InetAddress peer)
+    {
+        this.peer = peer;
+        this.handler = new ConnectionHandler(this);
+        this.metrics = StreamingMetrics.get(peer);
+    }
+
+    /**
+     * Create streaming session from established connection.
+     *
+     * @param socket established connection
+     * @param protocolVersion Streaming protocol verison
+     */
+    public StreamSession(Socket socket, int protocolVersion)
+    {
+        this.peer = socket.getInetAddress();
+        this.handler = new ConnectionHandler(this, socket, protocolVersion);
+        this.metrics = StreamingMetrics.get(peer);
+    }
+
+    public UUID planId()
+    {
+        return streamResult == null ? null : streamResult.planId;
+    }
+
+    public String description()
+    {
+        return streamResult == null ? null : streamResult.description;
+    }
+
+    public static StreamSession startReceivingStreamAsync(UUID planId, String description, Socket socket, int version)
+    {
+        StreamSession session = new StreamSession(socket, version);
+        StreamResultFuture.startStreamingAsync(planId, description, Collections.singleton(session));
+        return session;
+    }
+
+    /**
+     * Bind this session to report to specific {@link StreamResultFuture}.
+     *
+     * @param streamResult result to report to
+     * @return this object for chaining
+     */
+    public StreamSession register(StreamResultFuture streamResult)
+    {
+        this.streamResult = streamResult;
+        return this;
+    }
+
+    /**
+     * Request data fetch task to this session.
+     *
+     * @param keyspace Requesting keyspace
+     * @param ranges Ranges to retrieve data
+     * @param columnFamilies ColumnFamily names. Can be empty if requesting all CF under the keyspace.
+     */
+    public void addStreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies)
+    {
+        requests.add(new StreamRequest(keyspace, ranges, columnFamilies));
+    }
+
+    /**
+     * Set up transfer for specific keyspace/ranges/CFs
+     *
+     * @param keyspace Transfer keyspace
+     * @param ranges Transfer ranges
+     * @param columnFamilies Transfer ColumnFamilies
+     */
+    public void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables)
+    {
+        Collection<ColumnFamilyStore> stores = new HashSet<>();
+        // if columnfamilies are not specified, we add all cf under the keyspace
+        if (columnFamilies.isEmpty())
+        {
+            stores.addAll(Table.open(keyspace).getColumnFamilyStores());
+        }
+        else
+        {
+            for (String cf : columnFamilies)
+                stores.add(Table.open(keyspace).getColumnFamilyStore(cf));
+        }
+
+        if (flushTables)
+            flushSSTables(stores);
+
+        List<SSTableReader> sstables = Lists.newLinkedList();
+        for (ColumnFamilyStore cfStore : stores)
+        {
+            List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
+            for (Range<Token> range : ranges)
+                rowBoundsList.add(range.toRowBounds());
+            ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
+            sstables.addAll(view.sstables);
+        }
+        addTransferFiles(ranges, sstables);
+    }
+
+    /**
+     * Set up transfer of the specific SSTables.
+     * {@code sstables} must be marked as referenced so that not get deleted until transfer completes.
+     *
+     * @param ranges Transfer ranges
+     * @param sstables Transfer files
+     */
+    public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables)
+    {
+        for (SSTableReader sstable : sstables)
+        {
+            List<Pair<Long, Long>> sections = sstable.getPositionsForRanges(ranges);
+            if (sections.isEmpty())
+            {
+                // A reference was acquired on the sstable and we won't stream it
+                sstable.releaseReference();
+                continue;
+            }
+            long estimatedKeys = sstable.estimatedKeysForRanges(ranges);
+            UUID cfId = sstable.metadata.cfId;
+            StreamTransferTask task = transfers.get(cfId);
+            if (task == null)
+            {
+                task = new StreamTransferTask(this, cfId);
+                transfers.put(cfId, task);
+            }
+            task.addTransferFile(sstable, estimatedKeys, sections);
+        }
+    }
+
+    /**
+     * Start this stream session.
+     */
+    public void run()
+    {
+        assert streamResult != null : "No result is associated with this session";
+
+        try
+        {
+            if (handler.isConnected())
+            {
+                // if this session is created from remote...
+                handler.start();
+            }
+            else
+            {
+                if (requests.isEmpty() && transfers.isEmpty())
+                {
+                    logger.debug("Session does not have any tasks.");
+                    state(State.COMPLETE);
+                    streamResult.handleSessionComplete(this);
+                }
+                else
+                {
+                    handler.connect();
+                }
+            }
+        }
+        catch (IOException e)
+        {
+            onError(e);
+        }
+    }
+
+    /**
+     * Set current state to {@code newState}.
+     *
+     * @param newState new state to set
+     */
+    public void state(State newState)
+    {
+        state = newState;
+    }
+
+    /**
+     * @return current state
+     */
+    public State state()
+    {
+        return state;
+    }
+
+    /**
+     * Return if this session completed successfully.
+     *
+     * @return true if session completed successfully.
+     */
+    public boolean isSuccess()
+    {
+        return state == State.COMPLETE;
+    }
+
+    public void messageReceived(StreamMessage message)
+    {
+        switch (message.type)
+        {
+            case PREPARE:
+                PrepareMessage msg = (PrepareMessage) message;
+                prepare(msg.requests, msg.summaries);
+                break;
+
+            case FILE:
+                receive((FileMessage) message);
+                break;
+
+            case RETRY:
+                RetryMessage retry = (RetryMessage) message;
+                retry(retry.cfId, retry.sequenceNumber);
+                break;
+
+            case COMPLETE:
+                complete();
+                break;
+
+            case SESSION_FAILED:
+                sessionFailed();
+                break;
+        }
+    }
+
+    /**
+     * Call back for connection success.
+     *
+     * When connected, session moves to preparing phase and sends prepare message.
+     */
+    public void onConnect()
+    {
+        logger.debug("Connected. Sending prepare...");
+
+        // send prepare message
+        state(State.PREPARING);
+        PrepareMessage prepare = new PrepareMessage();
+        prepare.requests.addAll(requests);
+        for (StreamTransferTask task : transfers.values())
+            prepare.summaries.add(task.getSummary());
+        handler.sendMessage(prepare);
+
+        // if we don't need to prepare for receiving stream, start sending files immediately
+        if (requests.isEmpty())
+        {
+            logger.debug("Prepare complete. Start streaming files.");
+            startStreamingFiles();
+        }
+    }
+
+    /**
+     * Call back for handling exception during streaming.
+     *
+     * @param e thrown exception
+     */
+    public void onError(Throwable e)
+    {
+        state(State.FAILED);
+
+        logger.error("Streaming error occurred", e);
+        // send session failure message
+        handler.sendMessage(new SessionFailedMessage());
+        // fail session
+        streamResult.handleSessionComplete(this);
+    }
+
+    /**
+     * Prepare this session for sending/receiving files.
+     */
+    public void prepare(Collection<StreamRequest> requests, Collection<StreamSummary> summaries)
+    {
+        logger.debug("Start preparing this session (" + requests.size() + " requests, " + summaries.size() + " columnfamilies receiving)");
+        // prepare tasks
+        state(State.PREPARING);
+        for (StreamRequest request : requests)
+            addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true); // always flush on stream request
+        for (StreamSummary summary : summaries)
+            prepareReceiving(summary);
+
+        // send back prepare message if prepare message contains stream request
+        if (!requests.isEmpty())
+        {
+            PrepareMessage prepare = new PrepareMessage();
+            for (StreamTransferTask task : transfers.values())
+                prepare.summaries.add(task.getSummary());
+            handler.sendMessage(prepare);
+        }
+
+        // if there are files to stream
+        if (!maybeCompleted())
+        {
+            logger.debug("Prepare complete. Start streaming files.");
+            startStreamingFiles();
+        }
+    }
+
+    /**
+     * Call back after sending FileMessageHeader.
+     *
+     * @param header sent header
+     */
+    public void fileSent(FileMessageHeader header)
+    {
+        StreamingMetrics.totalOutgoingBytes.inc(header.size());
+        metrics.outgoingBytes.inc(header.size());
+        transfers.get(header.cfId).complete(header.sequenceNumber);
+    }
+
+    /**
+     * Call back after receiving FileMessageHeader.
+     *
+     * @param message received file
+     */
+    public void receive(FileMessage message)
+    {
+        StreamingMetrics.totalIncomingBytes.inc(message.header.size());
+        metrics.incomingBytes.inc(message.header.size());
+        receivers.get(message.header.cfId).receive(message.sstable);
+    }
+
+    public void progress(Descriptor desc, ProgressInfo.Direction direction, long bytes, long total)
+    {
+        ProgressInfo progress = new ProgressInfo(peer, desc.filenameFor(Component.DATA), direction, bytes, total);
+        streamResult.handleProgress(progress);
+    }
+
+    /**
+     * Call back on receiving {@code StreamMessage.Type.RETRY} message.
+     *
+     * @param cfId ColumnFamily ID
+     * @param sequenceNumber Sequence number to indicate which file to stream again
+     */
+    public void retry(UUID cfId, int sequenceNumber)
+    {
+        FileMessage message = transfers.get(cfId).createMessageForRetry(sequenceNumber);
+        handler.sendMessage(message);
+    }
+
+    /**
+     * Check if session is completed on receiving {@code StreamMessage.Type.COMPLETE} message.
+     */
+    public synchronized void complete()
+    {
+        if (state == State.WAIT_COMPLETE)
+        {
+            state(State.COMPLETE);
+            handler.close();
+            streamResult.handleSessionComplete(this);
+        }
+        else
+        {
+            state(State.WAIT_COMPLETE);
+        }
+    }
+
+    /**
+     * Call back on receiving {@code StreamMessage.Type.SESSION_FAILED} message.
+     */
+    public synchronized void sessionFailed()
+    {
+        handler.close();
+        streamResult.handleSessionComplete(this);
+    }
+
+    public void doRetry(FileMessageHeader header, Throwable e)
+    {
+        // retry
+        retries++;
+        if (retries > DatabaseDescriptor.getMaxStreamingRetries())
+            onError(new IOException("Too many retries for " + header, e));
+        else
+            handler.sendMessage(new RetryMessage(header.cfId, header.sequenceNumber));
+    }
+
+    /**
+     * @return Current snapshot of this session info.
+     */
+    public SessionInfo getSessionInfo()
+    {
+        List<StreamSummary> receivingSummaries = Lists.newArrayList();
+        for (StreamTask receiver : receivers.values())
+            receivingSummaries.add(receiver.getSummary());
+        List<StreamSummary> transferSummaries = Lists.newArrayList();
+        for (StreamTask transfer : transfers.values())
+            transferSummaries.add(transfer.getSummary());
+        return new SessionInfo(peer, receivingSummaries, transferSummaries, state);
+    }
+
+    public synchronized void taskCompleted(StreamReceiveTask completedTask)
+    {
+        receivers.remove(completedTask.cfId);
+        maybeCompleted();
+    }
+
+    public synchronized void taskCompleted(StreamTransferTask completedTask)
+    {
+        transfers.remove(completedTask.cfId);
+        maybeCompleted();
+    }
+
+    public void onJoin(InetAddress endpoint, EndpointState epState) {}
+    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+    public void onAlive(InetAddress endpoint, EndpointState state) {}
+    public void onDead(InetAddress endpoint, EndpointState state) {}
+
+    public void onRemove(InetAddress endpoint)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void onRestart(InetAddress endpoint, EndpointState epState)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void convict(InetAddress endpoint, double phi)
+    {
+        if (!endpoint.equals(peer))
+            return;
+
+        // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost.
+        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+            return;
+
+        state(State.FAILED);
+        streamResult.handleSessionComplete(this);
+    }
+
+    private boolean maybeCompleted()
+    {
+        boolean completed = receivers.isEmpty() && transfers.isEmpty();
+        if (completed)
+        {
+            if (state == State.WAIT_COMPLETE)
+            {
+                state(State.COMPLETE);
+                handler.close();
+                streamResult.handleSessionComplete(this);
+            }
+            else
+            {
+                // notify peer that this session is completed
+                handler.sendMessage(new CompleteMessage());
+                state(State.WAIT_COMPLETE);
+            }
+        }
+        return completed;
+    }
+
+
+    /**
+     * Flushes matching column families from the given keyspace, or all columnFamilies
+     * if the cf list is empty.
+     */
+    private void flushSSTables(Iterable<ColumnFamilyStore> stores)
+    {
+        logger.info("Flushing memtables for {}...", stores);
+        List<Future<?>> flushes = new ArrayList<>();
+        for (ColumnFamilyStore cfs : stores)
+            flushes.add(cfs.forceFlush());
+        FBUtilities.waitOnFutures(flushes);
+    }
+
+    private void prepareReceiving(StreamSummary summary)
+    {
+        logger.debug("prepare receiving " + summary);
+        if (summary.files > 0)
+            receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize));
+    }
+
+    private void startStreamingFiles()
+    {
+        streamResult.handleSessionPrepared(this);
+
+        state(State.STREAMING);
+        for (StreamTransferTask task : transfers.values())
+        {
+            if (task.getFileMessages().size() > 0)
+                handler.sendMessages(task.getFileMessages());
+            else
+                taskCompleted(task); // there is no file to send
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamState.java b/src/java/org/apache/cassandra/streaming/StreamState.java
new file mode 100644
index 0000000..db50c2a
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamState.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+/**
+ * Current snapshot of streaming progress.
+ */
+public class StreamState implements Serializable
+{
+    public final UUID planId;
+    public final String description;
+    public final Set<SessionInfo> sessions;
+
+    public StreamState(UUID planId, String description, Set<SessionInfo> sessions)
+    {
+        this.planId = planId;
+        this.description = description;
+        this.sessions = sessions;
+    }
+
+    public boolean hasFailedSession()
+    {
+        return Iterables.any(sessions, new Predicate<SessionInfo>()
+        {
+            public boolean apply(SessionInfo session)
+            {
+                return session.isFailed();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSummary.java b/src/java/org/apache/cassandra/streaming/StreamSummary.java
new file mode 100644
index 0000000..a31e333
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamSummary.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.UUID;
+
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+/**
+ * Summary of streaming.
+ */
+public class StreamSummary implements Serializable
+{
+    public static final IVersionedSerializer<StreamSummary> serializer = new StreamSummarySerializer();
+
+    public final UUID cfId;
+
+    /**
+     * Number of files to transfer. Can be 0 if nothing to transfer for some streaming request.
+     */
+    public final int files;
+    public final long totalSize;
+
+    public StreamSummary(UUID cfId, int files, long totalSize)
+    {
+        this.cfId = cfId;
+        this.files = files;
+        this.totalSize = totalSize;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        StreamSummary summary = (StreamSummary) o;
+        return files == summary.files && totalSize == summary.totalSize && cfId.equals(summary.cfId);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(cfId, files, totalSize);
+    }
+
+    @Override
+    public String toString()
+    {
+        final StringBuilder sb = new StringBuilder("StreamSummary{");
+        sb.append("path=").append(cfId);
+        sb.append(", files=").append(files);
+        sb.append(", totalSize=").append(totalSize);
+        sb.append('}');
+        return sb.toString();
+    }
+
+    public static class StreamSummarySerializer implements IVersionedSerializer<StreamSummary>
+    {
+        // arbitrary version is fine for UUIDSerializer for now...
+        public void serialize(StreamSummary summary, DataOutput out, int version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(summary.cfId, out, MessagingService.current_version);
+            out.writeInt(summary.files);
+            out.writeLong(summary.totalSize);
+        }
+
+        public StreamSummary deserialize(DataInput in, int version) throws IOException
+        {
+            UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+            int files = in.readInt();
+            long totalSize = in.readLong();
+            return new StreamSummary(cfId, files, totalSize);
+        }
+
+        public long serializedSize(StreamSummary summary, int version)
+        {
+            long size = UUIDSerializer.serializer.serializedSize(summary.cfId, MessagingService.current_version);
+            size += TypeSizes.NATIVE.sizeof(summary.files);
+            size += TypeSizes.NATIVE.sizeof(summary.totalSize);
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTask.java b/src/java/org/apache/cassandra/streaming/StreamTask.java
new file mode 100644
index 0000000..9e9e06f
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamTask.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.util.UUID;
+
+/**
+ * StreamTask is an abstraction of the streaming task performed over specific ColumnFamily.
+ */
+public abstract class StreamTask
+{
+    /** StreamSession that this task belongs */
+    protected final StreamSession session;
+
+    protected final UUID cfId;
+
+    protected StreamTask(StreamSession session, UUID cfId)
+    {
+        this.session = session;
+        this.cfId = cfId;
+    }
+
+    /**
+     * @return total number of files this task receives/streams.
+     */
+    public abstract int getTotalNumberOfFiles();
+
+    /**
+     * @return total bytes expected to receive
+     */
+    public abstract long getTotalSize();
+
+    /**
+     * @return StreamSummary that describes this task
+     */
+    public StreamSummary getSummary()
+    {
+        return new StreamSummary(cfId, getTotalNumberOfFiles(), getTotalSize());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
new file mode 100644
index 0000000..ba2df03
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.streaming.messages.FileMessage;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * StreamTransferTask sends sections of SSTable files in certain ColumnFamily.
+ */
+public class StreamTransferTask extends StreamTask
+{
+    private final AtomicInteger sequenceNumber = new AtomicInteger(0);
+
+    private final Map<Integer, FileMessage> files = new HashMap<>();
+
+    private long totalSize;
+
+    public StreamTransferTask(StreamSession session, UUID cfId)
+    {
+        super(session, cfId);
+    }
+
+    public void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections)
+    {
+        assert sstable != null && cfId.equals(sstable.metadata.cfId);
+        FileMessage message = new FileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections);
+        files.put(message.header.sequenceNumber, message);
+        totalSize += message.header.size();
+    }
+
+    /**
+     * Complete sending file.
+     *
+     * @param sequenceNumber sequence number of completed file transfer
+     */
+    public void complete(int sequenceNumber)
+    {
+        files.remove(sequenceNumber);
+        // all file sent, notify session this task is complete.
+        if (files.isEmpty())
+            session.taskCompleted(this);
+    }
+
+    public int getTotalNumberOfFiles()
+    {
+        return files.size();
+    }
+
+    public long getTotalSize()
+    {
+        return totalSize;
+    }
+
+    public Collection<FileMessage> getFileMessages()
+    {
+        return files.values();
+    }
+
+    public FileMessage createMessageForRetry(int sequenceNumber)
+    {
+        return files.get(sequenceNumber);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
new file mode 100644
index 0000000..04301ba
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+
+import com.google.common.util.concurrent.RateLimiter;
+import com.ning.compress.lzf.LZFOutputStream;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.DataIntegrityMetadata;
+import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * StreamWriter writes given section of the SSTable to given channel.
+ */
+public class StreamWriter
+{
+    private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
+
+    protected final SSTableReader sstable;
+    protected final Collection<Pair<Long, Long>> sections;
+    protected final RateLimiter limiter = StreamManager.getRateLimiter();
+    protected final StreamSession session;
+
+    private OutputStream compressedOutput;
+
+    // allocate buffer to use for transfers only once
+    private byte[] transferBuffer;
+
+    public StreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, StreamSession session)
+    {
+        this.session = session;
+        this.sstable = sstable;
+        this.sections = sections;
+    }
+
+    /**
+     * Stream file of specified sections to given channel.
+     *
+     * StreamWriter uses LZF compression on wire to decrease size to transfer.
+     *
+     * @param channel where this writes data to
+     * @throws IOException on any I/O error
+     */
+    public void write(WritableByteChannel channel) throws IOException
+    {
+        long totalSize = totalSize();
+        RandomAccessReader file = sstable.openDataReader();
+        ChecksumValidator validator = null;
+        if (new File(sstable.descriptor.filenameFor(Component.CRC)).exists())
+            validator = DataIntegrityMetadata.checksumValidator(sstable.descriptor);
+
+        transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize];
+
+        // setting up data compression stream
+        compressedOutput = new LZFOutputStream(Channels.newOutputStream(channel));
+        long progress = 0L;
+
+        try
+        {
+            // stream each of the required sections of the file
+            for (Pair<Long, Long> section : sections)
+            {
+                long start = validator == null ? section.left : validator.chunkStart(section.left);
+                int skipBytes = (int) (section.left - start);
+                // seek to the beginning of the section
+                file.seek(start);
+                if (validator != null)
+                    validator.seek(start);
+
+                // length of the section to read
+                long length = section.right - start;
+                // tracks write progress
+                long bytesTransferred = 0;
+                while (bytesTransferred < length)
+                {
+                    long lastWrite = write(file, validator, skipBytes, length, bytesTransferred);
+                    bytesTransferred += lastWrite;
+                    progress += lastWrite;
+                    session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
+                    skipBytes = 0;
+                }
+
+                // make sure that current section is send
+                compressedOutput.flush();
+            }
+        }
+        finally
+        {
+            // no matter what happens close file
+            FileUtils.closeQuietly(file);
+        }
+
+        // release reference only when completed successfully
+        sstable.releaseReference();
+    }
+
+    protected long totalSize()
+    {
+        long size = 0;
+        for (Pair<Long, Long> section : sections)
+            size += section.right - section.left;
+        return size;
+    }
+
+    /**
+     * Sequentially read bytes from the file and write them to the output stream
+     *
+     * @param reader The file reader to read from
+     * @param validator validator to verify data integrity
+     * @param start number of bytes to skip transfer, but include for validation.
+     * @param length The full length that should be transferred
+     * @param bytesTransferred Number of bytes remaining to transfer
+     *
+     * @return Number of bytes transferred
+     *
+     * @throws java.io.IOException on any I/O error
+     */
+    protected long write(RandomAccessReader reader, ChecksumValidator validator, int start, long length, long bytesTransferred) throws IOException
+    {
+        int toTransfer = (int) Math.min(transferBuffer.length, length - bytesTransferred);
+        int minReadable = (int) Math.min(transferBuffer.length, reader.length() - reader.getFilePointer());
+
+        reader.readFully(transferBuffer, 0, minReadable);
+        if (validator != null)
+            validator.validate(transferBuffer, 0, minReadable);
+
+        limiter.acquire(toTransfer);
+        compressedOutput.write(transferBuffer, start, (toTransfer - start));
+
+        return toTransfer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
index 8c2a7bc..3730b0e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
@@ -23,33 +23,24 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.CompactEndpointSerializationHelper;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.UUIDSerializer;
 
-
 /**
  * Task that make two nodes exchange (stream) some ranges (for a given table/cf).
  * This handle the case where the local node is neither of the two nodes that
@@ -72,9 +63,9 @@ public class StreamingRepairTask implements Runnable
     private final String tableName;
     private final String cfName;
     private final Collection<Range<Token>> ranges;
-    private final IStreamCallback callback;
+    private final StreamEventHandler callback;
 
-    private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, InetAddress dst, String tableName, String cfName, Collection<Range<Token>> ranges, IStreamCallback callback)
+    private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, InetAddress dst, String tableName, String cfName, Collection<Range<Token>> ranges, StreamEventHandler callback)
     {
         this.id = id;
         this.owner = owner;
@@ -121,21 +112,15 @@ public class StreamingRepairTask implements Runnable
 
     private void initiateStreaming()
     {
-        ColumnFamilyStore cfstore = Table.open(tableName).getColumnFamilyStore(cfName);
-        try
-        {
-            logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", id, ranges.size(), dst));
-            Collection<ColumnFamilyStore> cfses = Collections.singleton(cfstore);
-            // send ranges to the remote node
-            StreamOutSession outsession = StreamOutSession.create(tableName, dst, callback);
-            StreamOut.transferRanges(outsession, cfses, ranges, OperationType.AES, false);
-            // request ranges from the remote node
-            StreamIn.requestRanges(dst, tableName, cfses, ranges, callback, OperationType.AES);
-        }
-        catch(Exception e)
-        {
-            throw new RuntimeException("Streaming repair failed", e);
-        }
+        logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", id, ranges.size(), dst));
+        StreamResultFuture op = new StreamPlan("Repair")
+                                    .flushBeforeTransfer(true)
+                                    // request ranges from the remote node
+                                    .requestRanges(dst, tableName, ranges, cfName)
+                                    // send ranges to the remote node
+                                    .transferRanges(dst, tableName, ranges, cfName)
+                                    .execute();
+        op.addEventListener(callback);
     }
 
     private void forwardToSource()
@@ -147,45 +132,34 @@ public class StreamingRepairTask implements Runnable
         MessagingService.instance().sendOneWay(msg, src);
     }
 
-    private static IStreamCallback makeReplyingCallback(final InetAddress taskOwner, final UUID taskId)
+    private static StreamEventHandler makeReplyingCallback(final InetAddress taskOwner, final UUID taskId)
     {
-        return new IStreamCallback()
+        return new StreamEventHandler()
         {
-            // we expect one callback for the receive, and one for the send
-            private final AtomicInteger outstanding = new AtomicInteger(2);
-
-            public void onSuccess()
+            public void onSuccess(StreamState finalState)
             {
-                if (outstanding.decrementAndGet() > 0)
-                    return; // waiting on more calls
-
                 StreamingRepairResponse.reply(taskOwner, taskId);
             }
 
-            public void onFailure() {}
+            public void onFailure(Throwable t) {}
+            public void handleStreamEvent(StreamEvent event) {}
         };
     }
 
     // wrap a given callback so as to unregister the streaming repair task on completion
-    private static IStreamCallback wrapCallback(final Runnable callback, final UUID taskid, final boolean isLocalTask)
+    private static StreamEventHandler wrapCallback(final Runnable callback, final UUID taskid, final boolean isLocalTask)
     {
-        return new IStreamCallback()
+        return new StreamEventHandler()
         {
-            // we expect one callback for the receive, and one for the send
-            private final AtomicInteger outstanding = new AtomicInteger(isLocalTask ? 2 : 1);
-
-            public void onSuccess()
+            public void onSuccess(StreamState finalState)
             {
-                if (outstanding.decrementAndGet() > 0)
-                    // waiting on more calls
-                    return;
-
                 tasks.remove(taskid);
                 if (callback != null)
                     callback.run();
             }
 
-            public void onFailure() {}
+            public void onFailure(Throwable t) {}
+            public void handleStreamEvent(StreamEvent event) {}
         };
     }
 
@@ -220,7 +194,10 @@ public class StreamingRepairTask implements Runnable
 
             logger.info(String.format("[streaming task #%s] task succeeded", task.id));
             if (task.callback != null)
-                task.callback.onSuccess();
+            {
+                // TODO null
+                task.callback.onSuccess(null);
+            }
         }
 
         private static void reply(InetAddress remote, UUID taskid)