You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/20 19:07:15 UTC
[3/5] Streaming 2.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
new file mode 100644
index 0000000..78d50ad
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.*;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * {@link StreamPlan} is a helper class that builds StreamOperation of given configuration.
+ *
+ * This is the class you want to use for building streaming plan and starting streaming.
+ */
+public class StreamPlan
+{
+ private final UUID planId = UUIDGen.getTimeUUID();
+ private final String description;
+
+ // sessions per InetAddress of the other end.
+ private final Map<InetAddress, StreamSession> sessions = new HashMap<>();
+
+ private boolean flushBeforeTransfer = true;
+
+ /**
+ * Start building stream plan.
+ *
+ * @param description Stream type that describes this StreamPlan
+ */
+ public StreamPlan(String description)
+ {
+ this.description = description;
+ }
+
+ /**
+ * Request data in {@code keyspace} and {@code ranges} from specific node.
+ *
+ * @param from endpoint address to fetch data from.
+ * @param keyspace name of keyspace
+ * @param ranges ranges to fetch
+ * @return this object for chaining
+ */
+ public StreamPlan requestRanges(InetAddress from, String keyspace, Collection<Range<Token>> ranges)
+ {
+ return requestRanges(from, keyspace, ranges, new String[0]);
+ }
+
+ /**
+ * Request data in {@code columnFamilies} under {@code keyspace} and {@code ranges} from specific node.
+ *
+ * @param from endpoint address to fetch data from.
+ * @param keyspace name of keyspace
+ * @param ranges ranges to fetch
+ * @param columnFamilies specific column families
+ * @return this object for chaining
+ */
+ public StreamPlan requestRanges(InetAddress from, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
+ {
+ StreamSession session = getOrCreateSession(from);
+ session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies));
+ return this;
+ }
+
+ /**
+ * Add transfer task to send data of specific keyspace and ranges.
+ *
+ * @param to endpoint address of receiver
+ * @param keyspace name of keyspace
+ * @param ranges ranges to send
+ * @return this object for chaining
+ */
+ public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges)
+ {
+ return transferRanges(to, keyspace, ranges, new String[0]);
+ }
+
+ /**
+ * Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}.
+ *
+ * @param to endpoint address of receiver
+ * @param keyspace name of keyspace
+ * @param ranges ranges to send
+ * @param columnFamilies specific column families
+ * @return this object for chaining
+ */
+ public StreamPlan transferRanges(InetAddress to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
+ {
+ StreamSession session = getOrCreateSession(to);
+ session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer);
+ return this;
+ }
+
+ /**
+ * Add transfer task to send given SSTable files.
+ *
+ * @param to endpoint address of receiver
+ * @param ranges ranges to send
+ * @param sstables files to send
+ * @return this object for chaining
+ */
+ public StreamPlan transferFiles(InetAddress to, Collection<Range<Token>> ranges, Collection<SSTableReader> sstables)
+ {
+ StreamSession session = getOrCreateSession(to);
+ session.addTransferFiles(ranges, sstables);
+ return this;
+ }
+
+ /**
+ * @return true if this plan has no plan to execute
+ */
+ public boolean isEmpty()
+ {
+ return sessions.isEmpty();
+ }
+
+ /**
+ * Execute this {@link StreamPlan} asynchronously.
+ *
+ * @return Future {@link StreamState} that you can use to listen on progress of streaming.
+ */
+ public StreamResultFuture execute()
+ {
+ return StreamResultFuture.startStreamingAsync(planId, description, sessions.values());
+ }
+
+ /**
+ * Set flushBeforeTransfer option.
+ * When it's true, will flush before streaming ranges. (Default: true)
+ *
+ * @param flushBeforeTransfer set to true when the node should flush before transfer
+ * @return this object for chaining
+ */
+ public StreamPlan flushBeforeTransfer(boolean flushBeforeTransfer)
+ {
+ this.flushBeforeTransfer = flushBeforeTransfer;
+ return this;
+ }
+
+ private StreamSession getOrCreateSession(InetAddress peer)
+ {
+ StreamSession session = sessions.get(peer);
+ if (session == null)
+ {
+ session = new StreamSession(peer);
+ sessions.put(peer, session);
+ }
+ return session;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
new file mode 100644
index 0000000..a01de3c
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collection;
+import java.util.UUID;
+
+import com.google.common.base.Throwables;
+import com.ning.compress.lzf.LZFInputStream;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.BytesReadTracker;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * StreamReader reads from stream and writes to SSTable.
+ */
+public class StreamReader
+{
+ protected final UUID cfId;
+ protected final long estimatedKeys;
+ protected final Collection<Pair<Long, Long>> sections;
+ protected final StreamSession session;
+
+ protected Descriptor desc;
+
+ public StreamReader(FileMessageHeader header, StreamSession session)
+ {
+ this.session = session;
+ this.cfId = header.cfId;
+ this.estimatedKeys = header.estimatedKeys;
+ this.sections = header.sections;
+ }
+
+ /**
+ * @param channel where this reads data from
+ * @return SSTable transferred
+ * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
+ */
+ public SSTableReader read(ReadableByteChannel channel) throws IOException
+ {
+ long totalSize = totalSize();
+
+ Pair<String, String> kscf = Schema.instance.getCF(cfId);
+ ColumnFamilyStore cfs = Table.open(kscf.left).getColumnFamilyStore(kscf.right);
+ Directories.DataDirectory localDir = cfs.directories.getLocationCapableOfSize(totalSize);
+ if (localDir == null)
+ throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
+ desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
+
+ SSTableWriter writer = new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys);
+ try
+ {
+ DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
+ BytesReadTracker in = new BytesReadTracker(dis);
+
+ while (in.getBytesRead() < totalSize)
+ {
+ writeRow(writer, in, cfs);
+ // TODO move this to BytesReadTracker
+ session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
+ }
+ return writer.closeAndOpenReader();
+ }
+ catch (Throwable e)
+ {
+ writer.abort();
+ if (e instanceof IOException)
+ throw (IOException) e;
+ else
+ throw Throwables.propagate(e);
+ }
+ }
+
+ protected long totalSize()
+ {
+ long size = 0;
+ for (Pair<Long, Long> section : sections)
+ size += section.right - section.left;
+ return size;
+ }
+
+ protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
+ {
+ DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+ writer.appendFromStream(key, cfs.metadata, in);
+ cfs.invalidateCachedRow(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
new file mode 100644
index 0000000..af8c138
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.UUID;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * Task that manages receiving files for the session for certain ColumnFamily.
+ */
+public class StreamReceiveTask extends StreamTask
+{
+ // number of files to receive
+ private final int totalFiles;
+ // total size of files to receive
+ private final long totalSize;
+
+ // holds references to SSTables received
+ protected Collection<SSTableReader> sstables;
+
+ public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
+ {
+ super(session, cfId);
+ this.totalFiles = totalFiles;
+ this.totalSize = totalSize;
+ this.sstables = new ArrayList<>(totalFiles);
+ }
+
+ /**
+ * Process received file.
+ *
+ * @param sstable SSTable file received.
+ */
+ public void receive(SSTableReader sstable)
+ {
+ assert cfId.equals(sstable.metadata.cfId);
+
+ sstables.add(sstable);
+ if (sstables.size() == totalFiles)
+ complete();
+ }
+
+ public int getTotalNumberOfFiles()
+ {
+ return totalFiles;
+ }
+
+ public long getTotalSize()
+ {
+ return totalSize;
+ }
+
+ // TODO should be run in background so that this does not block streaming
+ private void complete()
+ {
+ if (!SSTableReader.acquireReferences(sstables))
+ throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred");
+ try
+ {
+ Pair<String, String> kscf = Schema.instance.getCF(cfId);
+ ColumnFamilyStore cfs = Table.open(kscf.left).getColumnFamilyStore(kscf.right);
+ // add sstables and build secondary indexes
+ cfs.addSSTables(sstables);
+ cfs.indexManager.maybeBuildSecondaryIndexes(sstables, cfs.indexManager.allIndexesNames());
+ }
+ finally
+ {
+ SSTableReader.releaseReferences(sstables);
+ }
+
+ session.taskCompleted(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReply.java b/src/java/org/apache/cassandra/streaming/StreamReply.java
deleted file mode 100644
index d54e9cc..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamReply.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.UUIDSerializer;
-
-public class StreamReply
-{
- static enum Status
- {
- FILE_FINISHED,
- FILE_RETRY,
- SESSION_FINISHED,
- SESSION_FAILURE,
- }
-
- public static final IVersionedSerializer<StreamReply> serializer = new FileStatusSerializer();
-
- public final UUID sessionId;
- public final String file;
- public final Status action;
-
- public StreamReply(String file, UUID sessionId, Status action)
- {
- this.file = file;
- this.action = action;
- this.sessionId = sessionId;
- }
-
- public MessageOut<StreamReply> createMessage()
- {
- return new MessageOut<StreamReply>(MessagingService.Verb.STREAM_REPLY, this, serializer);
- }
-
- @Override
- public String toString()
- {
- return "StreamReply(" +
- "sessionId=" + sessionId +
- ", file='" + file + '\'' +
- ", action=" + action +
- ')';
- }
-
- private static class FileStatusSerializer implements IVersionedSerializer<StreamReply>
- {
- public void serialize(StreamReply reply, DataOutput out, int version) throws IOException
- {
- UUIDSerializer.serializer.serialize(reply.sessionId, out, MessagingService.current_version);
- out.writeUTF(reply.file);
- out.writeInt(reply.action.ordinal());
- }
-
- public StreamReply deserialize(DataInput in, int version) throws IOException
- {
- UUID sessionId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
- String targetFile = in.readUTF();
- Status action = Status.values()[in.readInt()];
- return new StreamReply(targetFile, sessionId, action);
- }
-
- public long serializedSize(StreamReply reply, int version)
- {
- return TypeSizes.NATIVE.sizeof(reply.sessionId) + TypeSizes.NATIVE.sizeof(reply.file) + TypeSizes.NATIVE.sizeof(reply.action.ordinal());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
deleted file mode 100644
index b69b6d0..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.MessageIn;
-
-public class StreamReplyVerbHandler implements IVerbHandler<StreamReply>
-{
- private static final Logger logger = LoggerFactory.getLogger(StreamReplyVerbHandler.class);
-
- public void doVerb(MessageIn<StreamReply> message, int id)
- {
- StreamReply reply = message.payload;
- logger.debug("Received StreamReply {}", reply);
- StreamOutSession session = StreamOutSession.get(reply.sessionId);
- if (session == null)
- {
- logger.debug("Received stream action " + reply.action + " for an unknown session from " + message.from);
- return;
- }
-
- switch (reply.action)
- {
- case FILE_FINISHED:
- logger.info("Successfully sent {} to {}", reply.file, message.from);
- session.validateCurrentFile(reply.file);
- session.startNext();
- break;
- case FILE_RETRY:
- session.validateCurrentFile(reply.file);
- logger.info("Need to re-stream file {} to {}", reply.file, message.from);
- session.retry();
- break;
- case SESSION_FINISHED:
- session.close(true);
- break;
- case SESSION_FAILURE:
- session.close(false);
- break;
- default:
- throw new RuntimeException("Cannot handle FileStatus.Action: " + reply.action);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java
index 465d8bf..9d3fdb2 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequest.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java
@@ -20,173 +20,77 @@ package org.apache.cassandra.streaming;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
-import java.util.UUID;
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.CompactEndpointSerializationHelper;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.UUIDSerializer;
-/**
-* This class encapsulates the message that needs to be sent to nodes
-* that handoff data. The message contains information about ranges
-* that need to be transferred and the target node.
-*
-* If a file is specified, ranges and table will not. vice-versa should hold as well.
-*/
public class StreamRequest
{
public static final IVersionedSerializer<StreamRequest> serializer = new StreamRequestSerializer();
- protected final UUID sessionId;
- protected final InetAddress target;
-
- // if this is specified, ranges and table should not be.
- protected final PendingFile file;
+ public final String keyspace;
+ public final Collection<Range<Token>> ranges;
+ public final Collection<String> columnFamilies = new HashSet<>();
- // if these are specified, file should not be.
- protected final Collection<Range<Token>> ranges;
- protected final String table;
- protected final Iterable<ColumnFamilyStore> columnFamilies;
- protected final OperationType type;
-
- StreamRequest(InetAddress target, Collection<Range<Token>> ranges, String table, Iterable<ColumnFamilyStore> columnFamilies, UUID sessionId, OperationType type)
+ public StreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies)
{
- this.target = target;
+ this.keyspace = keyspace;
this.ranges = ranges;
- this.table = table;
- this.columnFamilies = columnFamilies;
- this.sessionId = sessionId;
- this.type = type;
- file = null;
- }
-
- StreamRequest(InetAddress target, PendingFile file, UUID sessionId)
- {
- this.target = target;
- this.file = file;
- this.sessionId = sessionId;
- this.type = file.type;
- ranges = null;
- table = null;
- columnFamilies = null;
- }
-
- public MessageOut<StreamRequest> createMessage()
- {
- return new MessageOut<StreamRequest>(MessagingService.Verb.STREAM_REQUEST, this, serializer);
+ this.columnFamilies.addAll(columnFamilies);
}
- public String toString()
+ public static class StreamRequestSerializer implements IVersionedSerializer<StreamRequest>
{
- StringBuilder sb = new StringBuilder("");
- if (file == null)
+ public void serialize(StreamRequest request, DataOutput out, int version) throws IOException
{
- sb.append(table);
- sb.append("@");
- sb.append(columnFamilies.toString());
- sb.append("@");
- sb.append(target);
- sb.append("------->");
- for (Range<Token> range : ranges)
+ out.writeUTF(request.keyspace);
+ out.writeInt(request.ranges.size());
+ for (Range<Token> range : request.ranges)
{
- sb.append(range);
- sb.append(" ");
+ Token.serializer.serialize(range.left, out);
+ Token.serializer.serialize(range.right, out);
}
- sb.append(type);
- }
- else
- {
- sb.append(file.toString());
+ out.writeInt(request.columnFamilies.size());
+ for (String cf : request.columnFamilies)
+ out.writeUTF(cf);
}
- return sb.toString();
- }
- private static class StreamRequestSerializer implements IVersionedSerializer<StreamRequest>
- {
- public void serialize(StreamRequest srm, DataOutput out, int version) throws IOException
+ public StreamRequest deserialize(DataInput in, int version) throws IOException
{
- UUIDSerializer.serializer.serialize(srm.sessionId, out, MessagingService.current_version);
- CompactEndpointSerializationHelper.serialize(srm.target, out);
- if (srm.file != null)
+ String keyspace = in.readUTF();
+ int rangeCount = in.readInt();
+ List<Range<Token>> ranges = new ArrayList<>(rangeCount);
+ for (int i = 0; i < rangeCount; i++)
{
- out.writeBoolean(true);
- PendingFile.serializer.serialize(srm.file, out, version);
- }
- else
- {
- out.writeBoolean(false);
- out.writeUTF(srm.table);
- out.writeInt(srm.ranges.size());
- for (Range<Token> range : srm.ranges)
- AbstractBounds.serializer.serialize(range, out, version);
-
- out.writeUTF(srm.type.name());
-
- out.writeInt(Iterables.size(srm.columnFamilies));
- for (ColumnFamilyStore cfs : srm.columnFamilies)
- ColumnFamily.serializer.serializeCfId(cfs.metadata.cfId, out, version);
+ Token left = Token.serializer.deserialize(in);
+ Token right = Token.serializer.deserialize(in);
+ ranges.add(new Range<>(left, right));
}
+ int cfCount = in.readInt();
+ List<String> columnFamilies = new ArrayList<>(cfCount);
+ for (int i = 0; i < cfCount; i++)
+ columnFamilies.add(in.readUTF());
+ return new StreamRequest(keyspace, ranges, columnFamilies);
}
- public StreamRequest deserialize(DataInput in, int version) throws IOException
+ public long serializedSize(StreamRequest request, int version)
{
- UUID sessionId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
- InetAddress target = CompactEndpointSerializationHelper.deserialize(in);
- boolean singleFile = in.readBoolean();
- if (singleFile)
+ int size = TypeSizes.NATIVE.sizeof(request.keyspace);
+ size += TypeSizes.NATIVE.sizeof(request.ranges.size());
+ for (Range<Token> range : request.ranges)
{
- PendingFile file = PendingFile.serializer.deserialize(in, version);
- return new StreamRequest(target, file, sessionId);
+ size += Token.serializer.serializedSize(range.left, TypeSizes.NATIVE);
+ size += Token.serializer.serializedSize(range.right, TypeSizes.NATIVE);
}
- else
- {
- String table = in.readUTF();
- int size = in.readInt();
- List<Range<Token>> ranges = (size == 0) ? null : new ArrayList<Range<Token>>(size);
- for (int i = 0; i < size; ++i)
- ranges.add((Range<Token>) AbstractBounds.serializer.deserialize(in, version).toTokenBounds());
- OperationType type = OperationType.valueOf(in.readUTF());
-
- List<ColumnFamilyStore> stores = new ArrayList<ColumnFamilyStore>();
- int cfsSize = in.readInt();
- for (int i = 0; i < cfsSize; ++i)
- stores.add(Table.open(table).getColumnFamilyStore(ColumnFamily.serializer.deserializeCfId(in, version)));
-
- return new StreamRequest(target, ranges, table, stores, sessionId, type);
- }
- }
-
- public long serializedSize(StreamRequest sr, int version)
- {
- long size = TypeSizes.NATIVE.sizeof(sr.sessionId);
- size += CompactEndpointSerializationHelper.serializedSize(sr.target);
- size += TypeSizes.NATIVE.sizeof(true);
- if (sr.file != null)
- return size + PendingFile.serializer.serializedSize(sr.file, version);
-
- size += TypeSizes.NATIVE.sizeof(sr.table);
- size += TypeSizes.NATIVE.sizeof(sr.ranges.size());
- for (Range<Token> range : sr.ranges)
- size += AbstractBounds.serializer.serializedSize(range, version);
- size += TypeSizes.NATIVE.sizeof(sr.type.name());
- size += TypeSizes.NATIVE.sizeof(Iterables.size(sr.columnFamilies));
- for (ColumnFamilyStore cfs : sr.columnFamilies)
- size += ColumnFamily.serializer.cfIdSerializedSize(cfs.metadata.cfId, TypeSizes.NATIVE, version);
+ size += TypeSizes.NATIVE.sizeof(request.columnFamilies.size());
+ for (String cf : request.columnFamilies)
+ size += TypeSizes.NATIVE.sizeof(cf);
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
deleted file mode 100644
index bbed34d..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.cassandra.streaming;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.MessageIn;
-
-/**
- * This verb handler handles the StreamRequestMessage that is sent by
- * the node requesting range transfer.
-*/
-public class StreamRequestVerbHandler implements IVerbHandler<StreamRequest>
-{
- private static final Logger logger = LoggerFactory.getLogger(StreamRequestVerbHandler.class);
-
- public void doVerb(MessageIn<StreamRequest> message, int id)
- {
- if (logger.isDebugEnabled())
- logger.debug("Received a StreamRequestMessage from {}", message.from);
-
- StreamRequest srm = message.payload;
- if (logger.isDebugEnabled())
- logger.debug(srm.toString());
-
- StreamOutSession session = StreamOutSession.create(srm.table, message.from, srm.sessionId);
- StreamOut.transferRanges(session, srm.columnFamilies, srm.ranges, srm.type);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
new file mode 100644
index 0000000..84332bd
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * StreamResultFuture asynchronously returns the final {@link StreamState} of execution of {@link StreamPlan}.
+ * <p>
+ * You can attach {@link StreamEventHandler} to this object to listen on {@link StreamEvent}s to track progress of the streaming.
+ */
+public final class StreamResultFuture extends AbstractFuture<StreamState>
+{
+ // Executor that establish the streaming connection. Once we're connected to the other end, the rest of the streaming
+ // is directly handled by the ConnectionHandler incoming and outgoing threads.
+ private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
+ FBUtilities.getAvailableProcessors());
+
+ public final UUID planId;
+ public final String description;
+ private final List<StreamEventHandler> eventListeners = Collections.synchronizedList(new ArrayList<StreamEventHandler>());
+ private final Set<UUID> ongoingSessions;
+ private final Map<InetAddress, SessionInfo> sessionStates = new NonBlockingHashMap<>();
+
+ /**
+ * Create new StreamResult of given {@code planId} and type.
+ *
+ * Constructor is package private. You need to use {@link StreamPlan#execute()} to get the instance.
+ *
+ * @param planId Stream plan ID
+ * @param description Stream description
+ * @param numberOfSessions number of sessions to wait for complete
+ */
+ private StreamResultFuture(UUID planId, String description, Set<UUID> sessions)
+ {
+ this.planId = planId;
+ this.description = description;
+ this.ongoingSessions = sessions;
+
+ // if there is no session to listen to, we immediately set result for returning
+ if (sessions.isEmpty())
+ set(getCurrentState());
+ }
+
+ static StreamResultFuture startStreamingAsync(UUID planId, String description, Collection<StreamSession> sessions)
+ {
+ Set<UUID> sessionsIds = new HashSet<>(sessions.size());
+ for (StreamSession session : sessions)
+ sessionsIds.add(session.id);
+
+ StreamResultFuture future = new StreamResultFuture(planId, description, sessionsIds);
+
+ StreamManager.instance.register(future);
+
+ // start sessions
+ for (StreamSession session : sessions)
+ {
+ session.register(future);
+ // register to gossiper/FD to fail on node failure
+ Gossiper.instance.register(session);
+ FailureDetector.instance.registerFailureDetectionEventListener(session);
+ streamExecutor.submit(session);
+ }
+ return future;
+ }
+
+ public void addEventListener(StreamEventHandler listener)
+ {
+ Futures.addCallback(this, listener);
+ eventListeners.add(listener);
+ }
+
+ /**
+ * @return Current snapshot of streaming progress.
+ */
+ public StreamState getCurrentState()
+ {
+ return new StreamState(planId, description, ImmutableSet.copyOf(sessionStates.values()));
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ StreamResultFuture that = (StreamResultFuture) o;
+ return planId.equals(that.planId);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return planId.hashCode();
+ }
+
+ void handleSessionPrepared(StreamSession session)
+ {
+ SessionInfo sessionInfo = session.getSessionInfo();
+ StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(planId, sessionInfo);
+ sessionStates.put(sessionInfo.peer, sessionInfo);
+ fireStreamEvent(event);
+ }
+
+ void handleSessionComplete(StreamSession session)
+ {
+ Gossiper.instance.unregister(session);
+ FailureDetector.instance.unregisterFailureDetectionEventListener(session);
+
+ SessionInfo sessionInfo = session.getSessionInfo();
+ sessionStates.put(sessionInfo.peer, sessionInfo);
+ fireStreamEvent(new StreamEvent.SessionCompleteEvent(session));
+ maybeComplete(session.id);
+ }
+
+ public void handleProgress(ProgressInfo progress)
+ {
+ sessionStates.get(progress.peer).updateProgress(progress);
+ fireStreamEvent(new StreamEvent.ProgressEvent(planId, progress));
+ }
+
+ void fireStreamEvent(StreamEvent event)
+ {
+ // delegate to listener
+ for (StreamEventHandler listener : eventListeners)
+ listener.handleStreamEvent(event);
+ }
+
+ private synchronized void maybeComplete(UUID sessionId)
+ {
+ ongoingSessions.remove(sessionId);
+ if (ongoingSessions.isEmpty())
+ {
+ StreamState finalState = getCurrentState();
+ if (finalState.hasFailedSession())
+ setException(new StreamException(finalState, "Stream failed"));
+ else
+ set(finalState);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
new file mode 100644
index 0000000..7d96f43
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.*;
+import java.util.concurrent.Future;
+
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.*;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.metrics.StreamingMetrics;
+import org.apache.cassandra.streaming.messages.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * StreamSession is the center of Cassandra Streaming API.
+ *
+ * StreamSession on the both endpoints exchange messages and files until complete.
+ *
+ * It is created through {@link StreamPlan} on the initiator node,
+ * and also is created directly from connected socket on the other end when received init message.
+ *
+ * <p>
+ * StreamSession goes through several stages:
+ * <ol>
+ * <li>
+ * Init
+ * <p>StreamSession in one end send init message to the other end.</p>
+ * </li>
+ * <li>
+ * Prepare
+ * <p>StreamSession in both endpoints are created, so in this phase, they exchange
+ * request and summary messages to prepare receiving/streaming files in next phase.</p>
+ * </li>
+ * <li>
+ * Stream
+ * <p>StreamSessions in both ends stream and receive files.</p>
+ * </li>
+ * <li>
+ * Complete
+ * <p>Session completes if both endpoints completed by exchanging complete message.</p>
+ * </li>
+ * </ol>
+ */
+public class StreamSession implements Runnable, IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+{
+ private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
+
+ public final UUID id = UUIDGen.getTimeUUID();
+ public final InetAddress peer;
+
+ // should not be null when session is started
+ private StreamResultFuture streamResult;
+
+ // stream requests to send to the peer
+ private final List<StreamRequest> requests = new ArrayList<>();
+ // streaming tasks are created and managed per ColumnFamily ID
+ private final Map<UUID, StreamTransferTask> transfers = new HashMap<>();
+ // data receivers, filled after receiving prepare message
+ private final Map<UUID, StreamReceiveTask> receivers = new HashMap<>();
+ private final StreamingMetrics metrics;
+
+ public final ConnectionHandler handler;
+
+ private int retries;
+
+ public static enum State
+ {
+ INITIALIZING,
+ PREPARING,
+ STREAMING,
+ WAIT_COMPLETE,
+ COMPLETE,
+ FAILED,
+ }
+
+ private volatile State state = State.INITIALIZING;
+
+ /**
+ * Create new streaming session with the peer.
+ *
+ * @param peer Address of streaming peer
+ */
+ public StreamSession(InetAddress peer)
+ {
+ this.peer = peer;
+ this.handler = new ConnectionHandler(this);
+ this.metrics = StreamingMetrics.get(peer);
+ }
+
+ /**
+ * Create streaming session from established connection.
+ *
+ * @param socket established connection
+ * @param protocolVersion Streaming protocol verison
+ */
+ public StreamSession(Socket socket, int protocolVersion)
+ {
+ this.peer = socket.getInetAddress();
+ this.handler = new ConnectionHandler(this, socket, protocolVersion);
+ this.metrics = StreamingMetrics.get(peer);
+ }
+
+ public UUID planId()
+ {
+ return streamResult == null ? null : streamResult.planId;
+ }
+
+ public String description()
+ {
+ return streamResult == null ? null : streamResult.description;
+ }
+
+ public static StreamSession startReceivingStreamAsync(UUID planId, String description, Socket socket, int version)
+ {
+ StreamSession session = new StreamSession(socket, version);
+ StreamResultFuture.startStreamingAsync(planId, description, Collections.singleton(session));
+ return session;
+ }
+
+ /**
+ * Bind this session to report to specific {@link StreamResultFuture}.
+ *
+ * @param streamResult result to report to
+ * @return this object for chaining
+ */
+ public StreamSession register(StreamResultFuture streamResult)
+ {
+ this.streamResult = streamResult;
+ return this;
+ }
+
+ /**
+ * Request data fetch task to this session.
+ *
+ * @param keyspace Requesting keyspace
+ * @param ranges Ranges to retrieve data
+ * @param columnFamilies ColumnFamily names. Can be empty if requesting all CF under the keyspace.
+ */
+ public void addStreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies)
+ {
+ requests.add(new StreamRequest(keyspace, ranges, columnFamilies));
+ }
+
+ /**
+ * Set up transfer for specific keyspace/ranges/CFs
+ *
+ * @param keyspace Transfer keyspace
+ * @param ranges Transfer ranges
+ * @param columnFamilies Transfer ColumnFamilies
+ */
+ public void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables)
+ {
+ Collection<ColumnFamilyStore> stores = new HashSet<>();
+ // if columnfamilies are not specified, we add all cf under the keyspace
+ if (columnFamilies.isEmpty())
+ {
+ stores.addAll(Table.open(keyspace).getColumnFamilyStores());
+ }
+ else
+ {
+ for (String cf : columnFamilies)
+ stores.add(Table.open(keyspace).getColumnFamilyStore(cf));
+ }
+
+ if (flushTables)
+ flushSSTables(stores);
+
+ List<SSTableReader> sstables = Lists.newLinkedList();
+ for (ColumnFamilyStore cfStore : stores)
+ {
+ List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
+ for (Range<Token> range : ranges)
+ rowBoundsList.add(range.toRowBounds());
+ ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
+ sstables.addAll(view.sstables);
+ }
+ addTransferFiles(ranges, sstables);
+ }
+
+ /**
+ * Set up transfer of the specific SSTables.
+ * {@code sstables} must be marked as referenced so that not get deleted until transfer completes.
+ *
+ * @param ranges Transfer ranges
+ * @param sstables Transfer files
+ */
+ public void addTransferFiles(Collection<Range<Token>> ranges, Collection<SSTableReader> sstables)
+ {
+ for (SSTableReader sstable : sstables)
+ {
+ List<Pair<Long, Long>> sections = sstable.getPositionsForRanges(ranges);
+ if (sections.isEmpty())
+ {
+ // A reference was acquired on the sstable and we won't stream it
+ sstable.releaseReference();
+ continue;
+ }
+ long estimatedKeys = sstable.estimatedKeysForRanges(ranges);
+ UUID cfId = sstable.metadata.cfId;
+ StreamTransferTask task = transfers.get(cfId);
+ if (task == null)
+ {
+ task = new StreamTransferTask(this, cfId);
+ transfers.put(cfId, task);
+ }
+ task.addTransferFile(sstable, estimatedKeys, sections);
+ }
+ }
+
+ /**
+ * Start this stream session.
+ */
+ public void run()
+ {
+ assert streamResult != null : "No result is associated with this session";
+
+ try
+ {
+ if (handler.isConnected())
+ {
+ // if this session is created from remote...
+ handler.start();
+ }
+ else
+ {
+ if (requests.isEmpty() && transfers.isEmpty())
+ {
+ logger.debug("Session does not have any tasks.");
+ state(State.COMPLETE);
+ streamResult.handleSessionComplete(this);
+ }
+ else
+ {
+ handler.connect();
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ onError(e);
+ }
+ }
+
+ /**
+ * Set current state to {@code newState}.
+ *
+ * @param newState new state to set
+ */
+ public void state(State newState)
+ {
+ state = newState;
+ }
+
+ /**
+ * @return current state
+ */
+ public State state()
+ {
+ return state;
+ }
+
+ /**
+ * Return if this session completed successfully.
+ *
+ * @return true if session completed successfully.
+ */
+ public boolean isSuccess()
+ {
+ return state == State.COMPLETE;
+ }
+
+ public void messageReceived(StreamMessage message)
+ {
+ switch (message.type)
+ {
+ case PREPARE:
+ PrepareMessage msg = (PrepareMessage) message;
+ prepare(msg.requests, msg.summaries);
+ break;
+
+ case FILE:
+ receive((FileMessage) message);
+ break;
+
+ case RETRY:
+ RetryMessage retry = (RetryMessage) message;
+ retry(retry.cfId, retry.sequenceNumber);
+ break;
+
+ case COMPLETE:
+ complete();
+ break;
+
+ case SESSION_FAILED:
+ sessionFailed();
+ break;
+ }
+ }
+
+ /**
+ * Call back for connection success.
+ *
+ * When connected, session moves to preparing phase and sends prepare message.
+ */
+ public void onConnect()
+ {
+ logger.debug("Connected. Sending prepare...");
+
+ // send prepare message
+ state(State.PREPARING);
+ PrepareMessage prepare = new PrepareMessage();
+ prepare.requests.addAll(requests);
+ for (StreamTransferTask task : transfers.values())
+ prepare.summaries.add(task.getSummary());
+ handler.sendMessage(prepare);
+
+ // if we don't need to prepare for receiving stream, start sending files immediately
+ if (requests.isEmpty())
+ {
+ logger.debug("Prepare complete. Start streaming files.");
+ startStreamingFiles();
+ }
+ }
+
+ /**
+ * Call back for handling exception during streaming.
+ *
+ * @param e thrown exception
+ */
+ public void onError(Throwable e)
+ {
+ state(State.FAILED);
+
+ logger.error("Streaming error occurred", e);
+ // send session failure message
+ handler.sendMessage(new SessionFailedMessage());
+ // fail session
+ streamResult.handleSessionComplete(this);
+ }
+
+ /**
+ * Prepare this session for sending/receiving files.
+ */
+ public void prepare(Collection<StreamRequest> requests, Collection<StreamSummary> summaries)
+ {
+ logger.debug("Start preparing this session (" + requests.size() + " requests, " + summaries.size() + " columnfamilies receiving)");
+ // prepare tasks
+ state(State.PREPARING);
+ for (StreamRequest request : requests)
+ addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true); // always flush on stream request
+ for (StreamSummary summary : summaries)
+ prepareReceiving(summary);
+
+ // send back prepare message if prepare message contains stream request
+ if (!requests.isEmpty())
+ {
+ PrepareMessage prepare = new PrepareMessage();
+ for (StreamTransferTask task : transfers.values())
+ prepare.summaries.add(task.getSummary());
+ handler.sendMessage(prepare);
+ }
+
+ // if there are files to stream
+ if (!maybeCompleted())
+ {
+ logger.debug("Prepare complete. Start streaming files.");
+ startStreamingFiles();
+ }
+ }
+
+ /**
+ * Call back after sending FileMessageHeader.
+ *
+ * @param header sent header
+ */
+ public void fileSent(FileMessageHeader header)
+ {
+ StreamingMetrics.totalOutgoingBytes.inc(header.size());
+ metrics.outgoingBytes.inc(header.size());
+ transfers.get(header.cfId).complete(header.sequenceNumber);
+ }
+
+ /**
+ * Call back after receiving FileMessageHeader.
+ *
+ * @param message received file
+ */
+ public void receive(FileMessage message)
+ {
+ StreamingMetrics.totalIncomingBytes.inc(message.header.size());
+ metrics.incomingBytes.inc(message.header.size());
+ receivers.get(message.header.cfId).receive(message.sstable);
+ }
+
+ public void progress(Descriptor desc, ProgressInfo.Direction direction, long bytes, long total)
+ {
+ ProgressInfo progress = new ProgressInfo(peer, desc.filenameFor(Component.DATA), direction, bytes, total);
+ streamResult.handleProgress(progress);
+ }
+
+ /**
+ * Call back on receiving {@code StreamMessage.Type.RETRY} message.
+ *
+ * @param cfId ColumnFamily ID
+ * @param sequenceNumber Sequence number to indicate which file to stream again
+ */
+ public void retry(UUID cfId, int sequenceNumber)
+ {
+ FileMessage message = transfers.get(cfId).createMessageForRetry(sequenceNumber);
+ handler.sendMessage(message);
+ }
+
+ /**
+ * Check if session is completed on receiving {@code StreamMessage.Type.COMPLETE} message.
+ */
+ public synchronized void complete()
+ {
+ if (state == State.WAIT_COMPLETE)
+ {
+ state(State.COMPLETE);
+ handler.close();
+ streamResult.handleSessionComplete(this);
+ }
+ else
+ {
+ state(State.WAIT_COMPLETE);
+ }
+ }
+
+ /**
+ * Call back on receiving {@code StreamMessage.Type.SESSION_FAILED} message.
+ */
+ public synchronized void sessionFailed()
+ {
+ handler.close();
+ streamResult.handleSessionComplete(this);
+ }
+
+ public void doRetry(FileMessageHeader header, Throwable e)
+ {
+ // retry
+ retries++;
+ if (retries > DatabaseDescriptor.getMaxStreamingRetries())
+ onError(new IOException("Too many retries for " + header, e));
+ else
+ handler.sendMessage(new RetryMessage(header.cfId, header.sequenceNumber));
+ }
+
+ /**
+ * @return Current snapshot of this session info.
+ */
+ public SessionInfo getSessionInfo()
+ {
+ List<StreamSummary> receivingSummaries = Lists.newArrayList();
+ for (StreamTask receiver : receivers.values())
+ receivingSummaries.add(receiver.getSummary());
+ List<StreamSummary> transferSummaries = Lists.newArrayList();
+ for (StreamTask transfer : transfers.values())
+ transferSummaries.add(transfer.getSummary());
+ return new SessionInfo(peer, receivingSummaries, transferSummaries, state);
+ }
+
+ public synchronized void taskCompleted(StreamReceiveTask completedTask)
+ {
+ receivers.remove(completedTask.cfId);
+ maybeCompleted();
+ }
+
+ public synchronized void taskCompleted(StreamTransferTask completedTask)
+ {
+ transfers.remove(completedTask.cfId);
+ maybeCompleted();
+ }
+
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState epState)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void convict(InetAddress endpoint, double phi)
+ {
+ if (!endpoint.equals(peer))
+ return;
+
+ // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost.
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+ return;
+
+ state(State.FAILED);
+ streamResult.handleSessionComplete(this);
+ }
+
+ private boolean maybeCompleted()
+ {
+ boolean completed = receivers.isEmpty() && transfers.isEmpty();
+ if (completed)
+ {
+ if (state == State.WAIT_COMPLETE)
+ {
+ state(State.COMPLETE);
+ handler.close();
+ streamResult.handleSessionComplete(this);
+ }
+ else
+ {
+ // notify peer that this session is completed
+ handler.sendMessage(new CompleteMessage());
+ state(State.WAIT_COMPLETE);
+ }
+ }
+ return completed;
+ }
+
+
+ /**
+ * Flushes matching column families from the given keyspace, or all columnFamilies
+ * if the cf list is empty.
+ */
+ private void flushSSTables(Iterable<ColumnFamilyStore> stores)
+ {
+ logger.info("Flushing memtables for {}...", stores);
+ List<Future<?>> flushes = new ArrayList<>();
+ for (ColumnFamilyStore cfs : stores)
+ flushes.add(cfs.forceFlush());
+ FBUtilities.waitOnFutures(flushes);
+ }
+
+ private void prepareReceiving(StreamSummary summary)
+ {
+ logger.debug("prepare receiving " + summary);
+ if (summary.files > 0)
+ receivers.put(summary.cfId, new StreamReceiveTask(this, summary.cfId, summary.files, summary.totalSize));
+ }
+
+ private void startStreamingFiles()
+ {
+ streamResult.handleSessionPrepared(this);
+
+ state(State.STREAMING);
+ for (StreamTransferTask task : transfers.values())
+ {
+ if (task.getFileMessages().size() > 0)
+ handler.sendMessages(task.getFileMessages());
+ else
+ taskCompleted(task); // there is no file to send
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamState.java b/src/java/org/apache/cassandra/streaming/StreamState.java
new file mode 100644
index 0000000..db50c2a
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamState.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+/**
+ * Current snapshot of streaming progress.
+ */
+public class StreamState implements Serializable
+{
+ public final UUID planId;
+ public final String description;
+ public final Set<SessionInfo> sessions;
+
+ public StreamState(UUID planId, String description, Set<SessionInfo> sessions)
+ {
+ this.planId = planId;
+ this.description = description;
+ this.sessions = sessions;
+ }
+
+ public boolean hasFailedSession()
+ {
+ return Iterables.any(sessions, new Predicate<SessionInfo>()
+ {
+ public boolean apply(SessionInfo session)
+ {
+ return session.isFailed();
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSummary.java b/src/java/org/apache/cassandra/streaming/StreamSummary.java
new file mode 100644
index 0000000..a31e333
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamSummary.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.UUID;
+
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+/**
+ * Summary of streaming.
+ */
+public class StreamSummary implements Serializable
+{
+ public static final IVersionedSerializer<StreamSummary> serializer = new StreamSummarySerializer();
+
+ public final UUID cfId;
+
+ /**
+ * Number of files to transfer. Can be 0 if nothing to transfer for some streaming request.
+ */
+ public final int files;
+ public final long totalSize;
+
+ public StreamSummary(UUID cfId, int files, long totalSize)
+ {
+ this.cfId = cfId;
+ this.files = files;
+ this.totalSize = totalSize;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ StreamSummary summary = (StreamSummary) o;
+ return files == summary.files && totalSize == summary.totalSize && cfId.equals(summary.cfId);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(cfId, files, totalSize);
+ }
+
+ @Override
+ public String toString()
+ {
+ final StringBuilder sb = new StringBuilder("StreamSummary{");
+ sb.append("path=").append(cfId);
+ sb.append(", files=").append(files);
+ sb.append(", totalSize=").append(totalSize);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ public static class StreamSummarySerializer implements IVersionedSerializer<StreamSummary>
+ {
+ // arbitrary version is fine for UUIDSerializer for now...
+ public void serialize(StreamSummary summary, DataOutput out, int version) throws IOException
+ {
+ UUIDSerializer.serializer.serialize(summary.cfId, out, MessagingService.current_version);
+ out.writeInt(summary.files);
+ out.writeLong(summary.totalSize);
+ }
+
+ public StreamSummary deserialize(DataInput in, int version) throws IOException
+ {
+ UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+ int files = in.readInt();
+ long totalSize = in.readLong();
+ return new StreamSummary(cfId, files, totalSize);
+ }
+
+ public long serializedSize(StreamSummary summary, int version)
+ {
+ long size = UUIDSerializer.serializer.serializedSize(summary.cfId, MessagingService.current_version);
+ size += TypeSizes.NATIVE.sizeof(summary.files);
+ size += TypeSizes.NATIVE.sizeof(summary.totalSize);
+ return size;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTask.java b/src/java/org/apache/cassandra/streaming/StreamTask.java
new file mode 100644
index 0000000..9e9e06f
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamTask.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.util.UUID;
+
+/**
+ * StreamTask is an abstraction of the streaming task performed over specific ColumnFamily.
+ */
+public abstract class StreamTask
+{
+ /** StreamSession that this task belongs */
+ protected final StreamSession session;
+
+ protected final UUID cfId;
+
+ protected StreamTask(StreamSession session, UUID cfId)
+ {
+ this.session = session;
+ this.cfId = cfId;
+ }
+
+ /**
+ * @return total number of files this task receives/streams.
+ */
+ public abstract int getTotalNumberOfFiles();
+
+ /**
+ * @return total bytes expected to receive
+ */
+ public abstract long getTotalSize();
+
+ /**
+ * @return StreamSummary that describes this task
+ */
+ public StreamSummary getSummary()
+ {
+ return new StreamSummary(cfId, getTotalNumberOfFiles(), getTotalSize());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
new file mode 100644
index 0000000..ba2df03
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.streaming.messages.FileMessage;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * StreamTransferTask sends sections of SSTable files in certain ColumnFamily.
+ */
+public class StreamTransferTask extends StreamTask
+{
+ private final AtomicInteger sequenceNumber = new AtomicInteger(0);
+
+ private final Map<Integer, FileMessage> files = new HashMap<>();
+
+ private long totalSize;
+
+ public StreamTransferTask(StreamSession session, UUID cfId)
+ {
+ super(session, cfId);
+ }
+
+ public void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections)
+ {
+ assert sstable != null && cfId.equals(sstable.metadata.cfId);
+ FileMessage message = new FileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections);
+ files.put(message.header.sequenceNumber, message);
+ totalSize += message.header.size();
+ }
+
+ /**
+ * Complete sending file.
+ *
+ * @param sequenceNumber sequence number of completed file transfer
+ */
+ public void complete(int sequenceNumber)
+ {
+ files.remove(sequenceNumber);
+ // all file sent, notify session this task is complete.
+ if (files.isEmpty())
+ session.taskCompleted(this);
+ }
+
+ public int getTotalNumberOfFiles()
+ {
+ return files.size();
+ }
+
+ public long getTotalSize()
+ {
+ return totalSize;
+ }
+
+ public Collection<FileMessage> getFileMessages()
+ {
+ return files.values();
+ }
+
+ public FileMessage createMessageForRetry(int sequenceNumber)
+ {
+ return files.get(sequenceNumber);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
new file mode 100644
index 0000000..04301ba
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+
+import com.google.common.util.concurrent.RateLimiter;
+import com.ning.compress.lzf.LZFOutputStream;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.DataIntegrityMetadata;
+import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * StreamWriter writes given section of the SSTable to given channel.
+ */
+public class StreamWriter
+{
+ private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
+
+ protected final SSTableReader sstable;
+ protected final Collection<Pair<Long, Long>> sections;
+ protected final RateLimiter limiter = StreamManager.getRateLimiter();
+ protected final StreamSession session;
+
+ private OutputStream compressedOutput;
+
+ // allocate buffer to use for transfers only once
+ private byte[] transferBuffer;
+
+ public StreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, StreamSession session)
+ {
+ this.session = session;
+ this.sstable = sstable;
+ this.sections = sections;
+ }
+
+ /**
+ * Stream file of specified sections to given channel.
+ *
+ * StreamWriter uses LZF compression on wire to decrease size to transfer.
+ *
+ * @param channel where this writes data to
+ * @throws IOException on any I/O error
+ */
+ public void write(WritableByteChannel channel) throws IOException
+ {
+ long totalSize = totalSize();
+ RandomAccessReader file = sstable.openDataReader();
+ ChecksumValidator validator = null;
+ if (new File(sstable.descriptor.filenameFor(Component.CRC)).exists())
+ validator = DataIntegrityMetadata.checksumValidator(sstable.descriptor);
+
+ transferBuffer = validator == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[validator.chunkSize];
+
+ // setting up data compression stream
+ compressedOutput = new LZFOutputStream(Channels.newOutputStream(channel));
+ long progress = 0L;
+
+ try
+ {
+ // stream each of the required sections of the file
+ for (Pair<Long, Long> section : sections)
+ {
+ long start = validator == null ? section.left : validator.chunkStart(section.left);
+ int skipBytes = (int) (section.left - start);
+ // seek to the beginning of the section
+ file.seek(start);
+ if (validator != null)
+ validator.seek(start);
+
+ // length of the section to read
+ long length = section.right - start;
+ // tracks write progress
+ long bytesTransferred = 0;
+ while (bytesTransferred < length)
+ {
+ long lastWrite = write(file, validator, skipBytes, length, bytesTransferred);
+ bytesTransferred += lastWrite;
+ progress += lastWrite;
+ session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
+ skipBytes = 0;
+ }
+
+ // make sure that current section is send
+ compressedOutput.flush();
+ }
+ }
+ finally
+ {
+ // no matter what happens close file
+ FileUtils.closeQuietly(file);
+ }
+
+ // release reference only when completed successfully
+ sstable.releaseReference();
+ }
+
+ protected long totalSize()
+ {
+ long size = 0;
+ for (Pair<Long, Long> section : sections)
+ size += section.right - section.left;
+ return size;
+ }
+
+ /**
+ * Sequentially read bytes from the file and write them to the output stream
+ *
+ * @param reader The file reader to read from
+ * @param validator validator to verify data integrity
+ * @param start number of bytes to skip transfer, but include for validation.
+ * @param length The full length that should be transferred
+ * @param bytesTransferred Number of bytes remaining to transfer
+ *
+ * @return Number of bytes transferred
+ *
+ * @throws java.io.IOException on any I/O error
+ */
+ protected long write(RandomAccessReader reader, ChecksumValidator validator, int start, long length, long bytesTransferred) throws IOException
+ {
+ int toTransfer = (int) Math.min(transferBuffer.length, length - bytesTransferred);
+ int minReadable = (int) Math.min(transferBuffer.length, reader.length() - reader.getFilePointer());
+
+ reader.readFully(transferBuffer, 0, minReadable);
+ if (validator != null)
+ validator.validate(transferBuffer, 0, minReadable);
+
+ limiter.acquire(toTransfer);
+ compressedOutput.write(transferBuffer, start, (toTransfer - start));
+
+ return toTransfer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
index 8c2a7bc..3730b0e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java
@@ -23,33 +23,24 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.CompactEndpointSerializationHelper;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.UUIDSerializer;
-
/**
* Task that make two nodes exchange (stream) some ranges (for a given table/cf).
* This handle the case where the local node is neither of the two nodes that
@@ -72,9 +63,9 @@ public class StreamingRepairTask implements Runnable
private final String tableName;
private final String cfName;
private final Collection<Range<Token>> ranges;
- private final IStreamCallback callback;
+ private final StreamEventHandler callback;
- private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, InetAddress dst, String tableName, String cfName, Collection<Range<Token>> ranges, IStreamCallback callback)
+ private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, InetAddress dst, String tableName, String cfName, Collection<Range<Token>> ranges, StreamEventHandler callback)
{
this.id = id;
this.owner = owner;
@@ -121,21 +112,15 @@ public class StreamingRepairTask implements Runnable
private void initiateStreaming()
{
- ColumnFamilyStore cfstore = Table.open(tableName).getColumnFamilyStore(cfName);
- try
- {
- logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", id, ranges.size(), dst));
- Collection<ColumnFamilyStore> cfses = Collections.singleton(cfstore);
- // send ranges to the remote node
- StreamOutSession outsession = StreamOutSession.create(tableName, dst, callback);
- StreamOut.transferRanges(outsession, cfses, ranges, OperationType.AES, false);
- // request ranges from the remote node
- StreamIn.requestRanges(dst, tableName, cfses, ranges, callback, OperationType.AES);
- }
- catch(Exception e)
- {
- throw new RuntimeException("Streaming repair failed", e);
- }
+ logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", id, ranges.size(), dst));
+ StreamResultFuture op = new StreamPlan("Repair")
+ .flushBeforeTransfer(true)
+ // request ranges from the remote node
+ .requestRanges(dst, tableName, ranges, cfName)
+ // send ranges to the remote node
+ .transferRanges(dst, tableName, ranges, cfName)
+ .execute();
+ op.addEventListener(callback);
}
private void forwardToSource()
@@ -147,45 +132,34 @@ public class StreamingRepairTask implements Runnable
MessagingService.instance().sendOneWay(msg, src);
}
- private static IStreamCallback makeReplyingCallback(final InetAddress taskOwner, final UUID taskId)
+ private static StreamEventHandler makeReplyingCallback(final InetAddress taskOwner, final UUID taskId)
{
- return new IStreamCallback()
+ return new StreamEventHandler()
{
- // we expect one callback for the receive, and one for the send
- private final AtomicInteger outstanding = new AtomicInteger(2);
-
- public void onSuccess()
+ public void onSuccess(StreamState finalState)
{
- if (outstanding.decrementAndGet() > 0)
- return; // waiting on more calls
-
StreamingRepairResponse.reply(taskOwner, taskId);
}
- public void onFailure() {}
+ public void onFailure(Throwable t) {}
+ public void handleStreamEvent(StreamEvent event) {}
};
}
// wrap a given callback so as to unregister the streaming repair task on completion
- private static IStreamCallback wrapCallback(final Runnable callback, final UUID taskid, final boolean isLocalTask)
+ private static StreamEventHandler wrapCallback(final Runnable callback, final UUID taskid, final boolean isLocalTask)
{
- return new IStreamCallback()
+ return new StreamEventHandler()
{
- // we expect one callback for the receive, and one for the send
- private final AtomicInteger outstanding = new AtomicInteger(isLocalTask ? 2 : 1);
-
- public void onSuccess()
+ public void onSuccess(StreamState finalState)
{
- if (outstanding.decrementAndGet() > 0)
- // waiting on more calls
- return;
-
tasks.remove(taskid);
if (callback != null)
callback.run();
}
- public void onFailure() {}
+ public void onFailure(Throwable t) {}
+ public void handleStreamEvent(StreamEvent event) {}
};
}
@@ -220,7 +194,10 @@ public class StreamingRepairTask implements Runnable
logger.info(String.format("[streaming task #%s] task succeeded", task.id));
if (task.callback != null)
- task.callback.onSuccess();
+ {
+ // TODO null
+ task.callback.onSuccess(null);
+ }
}
private static void reply(InetAddress remote, UUID taskid)