You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/20 19:07:16 UTC
[4/5] Streaming 2.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
deleted file mode 100644
index 04890ba..0000000
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.ning.compress.lzf.LZFOutputStream;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.util.DataIntegrityMetadata;
-import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.metrics.StreamingMetrics;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.Throttle;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-public class FileStreamTask extends WrappedRunnable
-{
- private static final Logger logger = LoggerFactory.getLogger(FileStreamTask.class);
-
- private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
- public static final int MAX_CONNECT_ATTEMPTS = 4;
-
- protected final StreamHeader header;
- protected final InetAddress to;
-
- // communication socket
- protected Socket socket;
- // socket's output/input stream
- private OutputStream output;
- private OutputStream compressedoutput;
- private DataInputStream input;
- // allocate buffer to use for transfers only once
- private byte[] transferBuffer;
- // outbound global throughput limiter
- protected final Throttle throttle;
- private final StreamReplyVerbHandler handler = new StreamReplyVerbHandler();
- protected final StreamingMetrics metrics;
-
- public FileStreamTask(StreamHeader header, InetAddress to)
- {
- this.header = header;
- this.to = to;
- this.throttle = new Throttle(toString(), new Throttle.ThroughputFunction()
- {
- /** @return Instantaneous throughput target in bytes per millisecond. */
- public int targetThroughput()
- {
- if (DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() < 1)
- // throttling disabled
- return 0;
- // total throughput
- int totalBytesPerMS = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * 1024 * 1024 / 8 / 1000;
- // per stream throughput (target bytes per MS)
- return totalBytesPerMS / Math.max(1, (int)StreamingMetrics.activeStreamsOutbound.count());
- }
- });
- metrics = StreamingMetrics.get(to);
- }
-
- public void runMayThrow() throws IOException
- {
- try
- {
- connectAttempt();
- // successfully connected: stream.
- // (at this point, if we fail, it is the receiver's job to re-request)
- stream();
-
- StreamOutSession session = StreamOutSession.get(header.sessionId);
- if (session == null)
- {
- logger.info("Found no stream out session at end of file stream task - this is expected if the receiver went down");
- }
- else if (session.getFiles().size() == 0)
- {
- // we are the last of our kind, receive the final confirmation before closing
- receiveReply();
- logger.info("Finished streaming session to {}", to);
- }
- }
- catch (IOException e)
- {
- StreamOutSession session = StreamOutSession.get(header.sessionId);
- if (session != null)
- session.close(false);
- throw e;
- }
- finally
- {
- try
- {
- close();
- }
- catch (IOException e)
- {
- if (logger.isDebugEnabled())
- logger.debug("error closing socket", e);
- }
- }
- if (logger.isDebugEnabled())
- logger.debug("Done streaming " + header.file);
- }
-
- /**
- * Stream file by it's sections specified by this.header
- * @throws IOException on any I/O error
- */
- protected void stream() throws IOException
- {
- ByteBuffer headerBuffer = MessagingService.instance().constructStreamHeader(header, false, MessagingService.instance().getVersion(to));
- // write header (this should not be compressed for compatibility with other messages)
- output.write(ByteBufferUtil.getArray(headerBuffer));
-
- if (header.file == null)
- return;
-
- // try to skip kernel page cache if possible
- RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename()));
- Descriptor desc = Descriptor.fromFilename(header.file.getFilename());
- ChecksumValidator metadata = null;
- if (new File(desc.filenameFor(Component.CRC)).exists())
- metadata = DataIntegrityMetadata.checksumValidator(desc);
- transferBuffer = metadata == null ? new byte[DEFAULT_CHUNK_SIZE] : new byte[metadata.chunkSize];
-
- // setting up data compression stream
- compressedoutput = new LZFOutputStream(output);
-
- StreamingMetrics.activeStreamsOutbound.inc();
- try
- {
- long totalBytesTransferred = 0;
- // stream each of the required sections of the file
- for (Pair<Long, Long> section : header.file.sections)
- {
- long start = metadata == null ? section.left : metadata.chunkStart(section.left);
- int skipBytes = (int) (section.left - start);
- // seek to the beginning of the section
- file.seek(start);
- if (metadata != null)
- metadata.seek(start);
-
- // length of the section to read
- long length = section.right - start;
- // tracks write progress
- long bytesTransferred = 0;
-
- while (bytesTransferred < length)
- {
- long lastWrite = write(file, metadata, skipBytes, length, bytesTransferred);
- bytesTransferred += lastWrite;
- totalBytesTransferred += lastWrite;
- // store streaming progress
- header.file.progress += lastWrite;
- skipBytes = 0;
- }
-
- // make sure that current section is send
- compressedoutput.flush();
-
- if (logger.isDebugEnabled())
- logger.debug("Bytes transferred " + bytesTransferred + "/" + header.file.size);
- }
- StreamingMetrics.totalOutgoingBytes.inc(totalBytesTransferred);
- metrics.outgoingBytes.inc(totalBytesTransferred);
- // receive reply confirmation
- receiveReply();
- }
- finally
- {
- StreamingMetrics.activeStreamsOutbound.dec();
-
- // no matter what happens close file
- FileUtils.closeQuietly(file);
- }
- }
-
- public static void sendReply(MessageOut message, DataOutputStream out) throws IOException
- {
- out.writeInt(MessagingService.PROTOCOL_MAGIC);
- message.serialize(out, MessagingService.current_version);
- }
-
- protected void receiveReply() throws IOException
- {
- MessagingService.validateMagic(input.readInt());
- // since we reject streaming with different version, using current_version here is fine
- MessageIn message = MessageIn.read(input, MessagingService.current_version, -1);
- assert message.verb == MessagingService.Verb.STREAM_REPLY : "Non-reply message received on stream socket";
- handler.doVerb(message, -1);
- }
-
- /**
- * Sequentially read bytes from the file and write them to the output stream
- *
- * @param reader The file reader to read from
- * @param validator validator to verify data integrity
- * @param start number of bytes to skip transfer, but include for validation.
- * @param length The full length that should be transferred
- * @param bytesTransferred Number of bytes remaining to transfer
- *
- * @return Number of bytes transferred
- *
- * @throws IOException on any I/O error
- */
- protected long write(RandomAccessReader reader, ChecksumValidator validator, int start, long length, long bytesTransferred) throws IOException
- {
- int toTransfer = (int) Math.min(transferBuffer.length, length - bytesTransferred);
- int minReadable = (int) Math.min(transferBuffer.length, reader.length() - reader.getFilePointer());
-
- reader.readFully(transferBuffer, 0, minReadable);
- if (validator != null)
- validator.validate(transferBuffer, 0, minReadable);
-
- compressedoutput.write(transferBuffer, start, (toTransfer - start));
- throttle.throttleDelta(toTransfer);
-
- return toTransfer;
- }
-
- /**
- * Connects to the destination, with backoff for failed attempts.
- * TODO: all nodes on a cluster must currently use the same storage port
- * @throws IOException If all attempts fail.
- */
- private void connectAttempt() throws IOException
- {
- int attempts = 0;
- while (true)
- {
- try
- {
- socket = MessagingService.instance().getConnectionPool(to).newSocket();
- socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
- output = socket.getOutputStream();
- input = new DataInputStream(socket.getInputStream());
- break;
- }
- catch (IOException e)
- {
- if (++attempts >= MAX_CONNECT_ATTEMPTS)
- throw e;
-
- long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts);
- logger.warn("Failed attempt " + attempts + " to connect to " + to + " to stream " + header.file + ". Retrying in " + waitms + " ms. (" + e + ")");
- Uninterruptibles.sleepUninterruptibly(waitms, TimeUnit.MILLISECONDS);
- }
- }
- }
-
- protected void close() throws IOException
- {
- if (output != null)
- output.close();
- }
-
- public String toString()
- {
- return String.format("FileStreamTask(session=%s, to=%s)", header.sessionId, to);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/IStreamCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IStreamCallback.java b/src/java/org/apache/cassandra/streaming/IStreamCallback.java
deleted file mode 100644
index f0d7754..0000000
--- a/src/java/org/apache/cassandra/streaming/IStreamCallback.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.streaming;
-
-/**
- * Callback interface for streaming session success/failure.
- */
-public interface IStreamCallback
-{
- /**
- * called when stream session is finished successfully.
- */
- public void onSuccess();
-
- /**
- * called when streaming somehow got in trouble.
- */
- public void onFailure();
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
deleted file mode 100644
index 92f5c7f..0000000
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Collections;
-
-import com.google.common.base.Throwables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.ning.compress.lzf.LZFInputStream;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.compaction.CompactionController;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.metrics.StreamingMetrics;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.compress.CompressedInputStream;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.BytesReadTracker;
-import org.apache.cassandra.utils.Pair;
-
-public class IncomingStreamReader
-{
- private static final Logger logger = LoggerFactory.getLogger(IncomingStreamReader.class);
-
- protected final PendingFile localFile;
- protected final PendingFile remoteFile;
- protected final StreamInSession session;
- private final InputStream underliningStream;
- private final StreamingMetrics metrics;
-
- public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
- {
- socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
- InetAddress host = ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
- if (header.pendingFiles.isEmpty() && header.file != null)
- {
- // StreamInSession should be created already when receiving 2nd and after files
- if (!StreamInSession.hasSession(header.sessionId))
- {
- StreamReply reply = new StreamReply("", header.sessionId, StreamReply.Status.SESSION_FAILURE);
- FileStreamTask.sendReply(reply.createMessage(), new DataOutputStream(socket.getOutputStream()));
- throw new IOException("Session " + header.sessionId + " already closed.");
- }
- }
- session = StreamInSession.get(host, header.sessionId);
- session.setSocket(socket);
-
- session.addFiles(header.pendingFiles);
- // set the current file we are streaming so progress shows up in jmx
- session.setCurrentFile(header.file);
- session.setTable(header.table);
- // pendingFile gets the new context for the local node.
- remoteFile = header.file;
- localFile = remoteFile != null ? StreamIn.getContextMapping(remoteFile) : null;
-
- if (remoteFile != null)
- {
- if (remoteFile.compressionInfo == null)
- underliningStream = new LZFInputStream(socket.getInputStream());
- else
- underliningStream = new CompressedInputStream(socket.getInputStream(), remoteFile.compressionInfo);
- }
- else
- {
- underliningStream = null;
- }
- metrics = StreamingMetrics.get(host);
- }
-
- /**
- * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
- */
- public void read() throws IOException
- {
- if (remoteFile != null)
- {
- if (logger.isDebugEnabled())
- {
- logger.debug("Receiving stream");
- logger.debug("Creating file for {} with {} estimated keys",
- localFile.getFilename(),
- remoteFile.estimatedKeys);
- }
-
- assert remoteFile.estimatedKeys > 0;
- DataInput in = new DataInputStream(underliningStream);
- try
- {
- SSTableReader reader = streamIn(in, localFile, remoteFile);
- session.finished(remoteFile, reader);
- }
- catch (IOException ex)
- {
- retry();
- throw ex;
- }
- catch (RuntimeException e)
- {
- // if we encountered unexpected exception, fail this session
- session.close(false);
- throw e;
- }
- }
-
- session.closeIfFinished();
- }
-
- /**
- * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
- */
- private SSTableReader streamIn(DataInput input, PendingFile localFile, PendingFile remoteFile) throws IOException
- {
- ColumnFamilyStore cfs = Table.open(localFile.desc.ksname).getColumnFamilyStore(localFile.desc.cfname);
- DecoratedKey key;
- SSTableWriter writer = new SSTableWriter(localFile.getFilename(), remoteFile.estimatedKeys);
- CompactionController controller = new CompactionController(cfs, Collections.<SSTableReader>emptySet(), Integer.MIN_VALUE);
-
- try
- {
- BytesReadTracker in = new BytesReadTracker(input);
- long totalBytesRead = 0;
-
- for (Pair<Long, Long> section : localFile.sections)
- {
- long length = section.right - section.left;
- // skip to beginning of section inside chunk
- if (remoteFile.compressionInfo != null)
- ((CompressedInputStream) underliningStream).position(section.left);
- long bytesRead = 0;
- while (bytesRead < length)
- {
- in.reset(0);
-
- key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
- writer.appendFromStream(key, cfs.metadata, in);
-
- cfs.invalidateCachedRow(key);
-
- bytesRead += in.getBytesRead();
- // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
- if (remoteFile.compressionInfo != null)
- remoteFile.progress = ((CompressedInputStream) underliningStream).getTotalCompressedBytesRead();
- else
- remoteFile.progress += in.getBytesRead();
- totalBytesRead += in.getBytesRead();
- }
- }
- StreamingMetrics.totalIncomingBytes.inc(totalBytesRead);
- metrics.incomingBytes.inc(totalBytesRead);
- return writer.closeAndOpenReader();
- }
- catch (Throwable e)
- {
- writer.abort();
- if (e instanceof IOException)
- throw (IOException) e;
- else
- throw Throwables.propagate(e);
- }
- finally
- {
- controller.close();
- }
- }
-
- private void retry()
- {
- /* Ask the source node to re-stream this file. */
- session.retry(remoteFile);
-
- /* Delete the orphaned file. */
- if (new File(localFile.getFilename()).isFile())
- FileUtils.deleteWithConfirm(new File(localFile.getFilename()));
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/OperationType.java b/src/java/org/apache/cassandra/streaming/OperationType.java
deleted file mode 100644
index da6e2f6..0000000
--- a/src/java/org/apache/cassandra/streaming/OperationType.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-/**
- * Streaming operation type.
- */
-public enum OperationType
-{
- AES,
- BOOTSTRAP,
- UNBOOTSTRAP,
- RESTORE_REPLICA_COUNT,
- BULK_LOAD,
- REBUILD
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/PendingFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/PendingFile.java b/src/java/org/apache/cassandra/streaming/PendingFile.java
deleted file mode 100644
index 0d9ec35..0000000
--- a/src/java/org/apache/cassandra/streaming/PendingFile.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.streaming.compress.CompressionInfo;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * Represents portions of a file to be streamed between nodes.
- */
-public class PendingFile
-{
- public static final PendingFileSerializer serializer = new PendingFileSerializer();
-
- // NB: this reference is used to be able to release the acquired reference upon completion
- public final SSTableReader sstable;
-
- public final Descriptor desc;
- public final String component;
- public final List<Pair<Long, Long>> sections;
- public final OperationType type;
- /** total length of data to transfer */
- public final long size;
- /** estimated number of keys to transfer */
- public final long estimatedKeys;
- /** compression information. null if data is not compressed */
- public final CompressionInfo compressionInfo;
- public long progress;
-
- public PendingFile(Descriptor desc, PendingFile pf)
- {
- this(null, desc, pf.component, pf.sections, pf.type, pf.estimatedKeys, pf.compressionInfo);
- }
-
- public PendingFile(SSTableReader sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type)
- {
- this(sstable, desc, component, sections, type, 0, null);
- }
-
- public PendingFile(SSTableReader sstable,
- Descriptor desc,
- String component,
- List<Pair<Long,Long>> sections,
- OperationType type,
- long estimatedKeys,
- CompressionInfo compressionInfo)
- {
- this.sstable = sstable;
- this.desc = desc;
- this.component = component;
- this.sections = sections;
- this.type = type;
-
- long tempSize = 0;
- if (compressionInfo == null)
- {
- for (Pair<Long, Long> section : sections)
- tempSize += section.right - section.left;
- }
- else
- {
- // calculate total length of transferring chunks
- for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
- tempSize += chunk.length + 4; // 4 bytes for CRC
- }
- size = tempSize;
-
- this.estimatedKeys = estimatedKeys;
- this.compressionInfo = compressionInfo;
- }
-
- public String getFilename()
- {
- return desc.filenameFor(component);
- }
-
- public boolean equals(Object o)
- {
- if (!(o instanceof PendingFile))
- return false;
-
- PendingFile rhs = (PendingFile)o;
- return getFilename().equals(rhs.getFilename());
- }
-
- public int hashCode()
- {
- return getFilename().hashCode();
- }
-
- public String toString()
- {
- return getFilename() + " sections=" + sections.size() + " progress=" + progress + "/" + size + " - " + progress*100/size + "%";
- }
-
- public static class PendingFileSerializer implements IVersionedSerializer<PendingFile>
- {
- public void serialize(PendingFile sc, DataOutput out, int version) throws IOException
- {
- if (sc == null)
- {
- out.writeUTF("");
- return;
- }
-
- out.writeUTF(sc.desc.filenameFor(sc.component));
- out.writeUTF(sc.component);
- out.writeInt(sc.sections.size());
- for (Pair<Long,Long> section : sc.sections)
- {
- out.writeLong(section.left);
- out.writeLong(section.right);
- }
- out.writeUTF(sc.type.name());
- out.writeLong(sc.estimatedKeys);
- CompressionInfo.serializer.serialize(sc.compressionInfo, out, version);
- }
-
- public PendingFile deserialize(DataInput in, int version) throws IOException
- {
- String filename = in.readUTF();
- if (filename.isEmpty())
- return null;
-
- Descriptor desc = Descriptor.fromFilename(filename);
- String component = in.readUTF();
- int count = in.readInt();
- List<Pair<Long,Long>> sections = new ArrayList<Pair<Long,Long>>(count);
- for (int i = 0; i < count; i++)
- sections.add(Pair.create(in.readLong(), in.readLong()));
- // this controls the way indexes are rebuilt when streaming in.
- OperationType type = OperationType.RESTORE_REPLICA_COUNT;
- type = OperationType.valueOf(in.readUTF());
- long estimatedKeys = in.readLong();
- CompressionInfo info = null;
- info = CompressionInfo.serializer.deserialize(in, version);
- return new PendingFile(null, desc, component, sections, type, estimatedKeys, info);
- }
-
- public long serializedSize(PendingFile pf, int version)
- {
- if (pf == null)
- return TypeSizes.NATIVE.sizeof("");
-
- long size = TypeSizes.NATIVE.sizeof(pf.desc.filenameFor(pf.component));
- size += TypeSizes.NATIVE.sizeof(pf.component);
- size += TypeSizes.NATIVE.sizeof(pf.sections.size());
- for (Pair<Long,Long> section : pf.sections)
- size += TypeSizes.NATIVE.sizeof(section.left) + TypeSizes.NATIVE.sizeof(section.right);
- size += TypeSizes.NATIVE.sizeof(pf.type.name());
- size += TypeSizes.NATIVE.sizeof(pf.estimatedKeys);
- size += CompressionInfo.serializer.serializedSize(pf.compressionInfo, version);
- return size;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/ProgressInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
new file mode 100644
index 0000000..d308ed0
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+
+import com.google.common.base.Objects;
+
+/**
+ * ProgressInfo contains file transfer progress.
+ */
+public class ProgressInfo implements Serializable
+{
+ /**
+ * Direction of the stream.
+ */
+ public static enum Direction
+ {
+ OUT(0),
+ IN(1);
+
+ public final byte code;
+
+ private Direction(int code)
+ {
+ this.code = (byte) code;
+ }
+
+ public static Direction fromByte(byte direction)
+ {
+ return direction == 0 ? OUT : IN;
+ }
+ }
+
+ public final InetAddress peer;
+ public final String fileName;
+ public final Direction direction;
+ public final long currentBytes;
+ public final long totalBytes;
+
+ public ProgressInfo(InetAddress peer, String fileName, Direction direction, long currentBytes, long totalBytes)
+ {
+ assert totalBytes > 0;
+
+ this.peer = peer;
+ this.fileName = fileName;
+ this.direction = direction;
+ this.currentBytes = currentBytes;
+ this.totalBytes = totalBytes;
+ }
+
+ /**
+ * @return true if file transfer is completed
+ */
+ public boolean isCompleted()
+ {
+ return currentBytes == totalBytes;
+ }
+
+ /**
+ * ProgressInfo is considered to be equal only when all attributes except currentBytes are equal.
+ */
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ProgressInfo that = (ProgressInfo) o;
+
+ if (totalBytes != that.totalBytes) return false;
+ if (direction != that.direction) return false;
+ if (!fileName.equals(that.fileName)) return false;
+ return peer.equals(that.peer);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hashCode(peer, fileName, direction, totalBytes);
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder(fileName);
+ sb.append(" ").append(currentBytes);
+ sb.append("/").append(totalBytes).append(" bytes");
+ sb.append("(").append(currentBytes*100/totalBytes).append("%) ");
+ sb.append(direction == Direction.OUT ? "sent to " : "received from ");
+ sb.append(peer);
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java
new file mode 100644
index 0000000..9532041
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+/**
+ * Stream session info.
+ */
+public final class SessionInfo implements Serializable
+{
+ public final InetAddress peer;
+ /** Immutable collection of receiving summaries */
+ public final Collection<StreamSummary> receivingSummaries;
+ /** Immutable collection of sending summaries*/
+ public final Collection<StreamSummary> sendingSummaries;
+ /** Current session state */
+ public final StreamSession.State state;
+
+ private final Map<String, ProgressInfo> receivingFiles;
+ private final Map<String, ProgressInfo> sendingFiles;
+
+ public SessionInfo(InetAddress peer,
+ Collection<StreamSummary> receivingSummaries,
+ Collection<StreamSummary> sendingSummaries,
+ StreamSession.State state)
+ {
+ this.peer = peer;
+ this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
+ this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
+ this.receivingFiles = new HashMap<>();
+ this.sendingFiles = new HashMap<>();
+ this.state = state;
+ }
+
+ public boolean isFailed()
+ {
+ return state == StreamSession.State.FAILED;
+ }
+
+ /**
+ * Update progress of receiving/sending file.
+ *
+ * @param newProgress new progress info
+ */
+ public synchronized void updateProgress(ProgressInfo newProgress)
+ {
+ assert peer.equals(newProgress.peer);
+
+ Map<String, ProgressInfo> currentFiles = newProgress.direction == ProgressInfo.Direction.IN
+ ? receivingFiles : sendingFiles;
+ currentFiles.put(newProgress.fileName, newProgress);
+ }
+
+ public Collection<ProgressInfo> getReceivingFiles()
+ {
+ return receivingFiles.values();
+ }
+
+ public Collection<ProgressInfo> getSendingFiles()
+ {
+ return sendingFiles.values();
+ }
+
+ /**
+ * @return total number of files already received.
+ */
+ public long getTotalFilesReceived()
+ {
+ return getTotalFilesCompleted(receivingFiles.values());
+ }
+
+ /**
+ * @return total number of files already sent.
+ */
+ public long getTotalFilesSent()
+ {
+ return getTotalFilesCompleted(sendingFiles.values());
+ }
+
+ /**
+ * @return total size(in bytes) already received.
+ */
+ public long getTotalSizeReceived()
+ {
+ return getTotalSizeInProgress(receivingFiles.values());
+ }
+
+ /**
+ * @return total size(in bytes) already sent.
+ */
+ public long getTotalSizeSent()
+ {
+ return getTotalSizeInProgress(sendingFiles.values());
+ }
+
+ /**
+ * @return total number of files to receive in the session
+ */
+ public long getTotalFilesToReceive()
+ {
+ return getTotalFiles(receivingSummaries);
+ }
+
+ /**
+ * @return total number of files to send in the session
+ */
+ public long getTotalFilesToSend()
+ {
+ return getTotalFiles(sendingSummaries);
+ }
+
+ /**
+ * @return total size(in bytes) to receive in the session
+ */
+ public long getTotalSizeToReceive()
+ {
+ return getTotalSizes(receivingSummaries);
+ }
+
+ /**
+ * @return total size(in bytes) to send in the session
+ */
+ public long getTotalSizeToSend()
+ {
+ return getTotalSizes(sendingSummaries);
+ }
+
+ private long getTotalSizeInProgress(Collection<ProgressInfo> files)
+ {
+ long total = 0;
+ for (ProgressInfo file : files)
+ total += file.currentBytes;
+ return total;
+ }
+
+ private long getTotalFiles(Collection<StreamSummary> summaries)
+ {
+ long total = 0;
+ for (StreamSummary summary : summaries)
+ total += summary.files;
+ return total;
+ }
+
+ private long getTotalSizes(Collection<StreamSummary> summaries)
+ {
+ long total = 0;
+ for (StreamSummary summary : summaries)
+ total += summary.totalSize;
+ return total;
+ }
+
+ private long getTotalFilesCompleted(Collection<ProgressInfo> files)
+ {
+ Iterable<ProgressInfo> completed = Iterables.filter(files, new Predicate<ProgressInfo>()
+ {
+ public boolean apply(ProgressInfo input)
+ {
+ return input.isCompleted();
+ }
+ });
+ return Iterables.size(completed);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java b/src/java/org/apache/cassandra/streaming/StreamEvent.java
new file mode 100644
index 0000000..9af1fbd
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.UUID;
+
+public abstract class StreamEvent
+{
+ public static enum Type
+ {
+ STREAM_PREPARED,
+ STREAM_COMPLETE,
+ FILE_PROGRESS,
+ }
+
+ public final Type eventType;
+ public final UUID planId;
+
+ protected StreamEvent(Type eventType, UUID planId)
+ {
+ this.eventType = eventType;
+ this.planId = planId;
+ }
+
+ public static class SessionCompleteEvent extends StreamEvent
+ {
+ public final InetAddress peer;
+ public final boolean success;
+
+ public SessionCompleteEvent(StreamSession session)
+ {
+ super(Type.STREAM_COMPLETE, session.planId());
+ this.peer = session.peer;
+ this.success = session.isSuccess();
+ }
+ }
+
+ public static class ProgressEvent extends StreamEvent
+ {
+ public final ProgressInfo progress;
+
+ public ProgressEvent(UUID planId, ProgressInfo progress)
+ {
+ super(Type.FILE_PROGRESS, planId);
+ this.progress = progress;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "<ProgressEvent " + progress.toString() + ">";
+ }
+ }
+
+ public static class SessionPreparedEvent extends StreamEvent
+ {
+ public final SessionInfo session;
+
+ public SessionPreparedEvent(UUID planId, SessionInfo session)
+ {
+ super(Type.STREAM_PREPARED, planId);
+ this.session = session;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamEventHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamEventHandler.java b/src/java/org/apache/cassandra/streaming/StreamEventHandler.java
new file mode 100644
index 0000000..e2e84bb
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamEventHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import com.google.common.util.concurrent.FutureCallback;
+
+public interface StreamEventHandler extends FutureCallback<StreamState>
+{
+ /**
+ * Callback for various streaming events.
+ *
+ * @see StreamEvent.Type
+ * @param event Stream event.
+ */
+ void handleStreamEvent(StreamEvent event);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamException.java b/src/java/org/apache/cassandra/streaming/StreamException.java
new file mode 100644
index 0000000..6e22db2
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+public class StreamException extends Throwable
+{
+ public final StreamState finalState;
+
+ public StreamException(StreamState finalState, String message)
+ {
+ super(message);
+ this.finalState = finalState;
+ }
+
+ public StreamException(StreamState finalState, String message, Throwable cause)
+ {
+ super(message, cause);
+ this.finalState = finalState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamHeader.java b/src/java/org/apache/cassandra/streaming/StreamHeader.java
deleted file mode 100644
index a0bf832..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamHeader.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.*;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.UUIDSerializer;
-
-public class StreamHeader
-{
- public static final IVersionedSerializer<StreamHeader> serializer = new StreamHeaderSerializer();
-
- public final String table;
-
- /** file being sent on initial stream */
- public final PendingFile file;
-
- /** session ID */
- public final UUID sessionId;
-
- /** files to add to the session */
- public final Collection<PendingFile> pendingFiles;
-
- public StreamHeader(String table, UUID sessionId, PendingFile file)
- {
- this(table, sessionId, file, Collections.<PendingFile>emptyList());
- }
-
- public StreamHeader(String table, UUID sessionId, PendingFile first, Collection<PendingFile> pendingFiles)
- {
- this.table = table;
- this.sessionId = sessionId;
- this.file = first;
- this.pendingFiles = pendingFiles;
- }
-
- private static class StreamHeaderSerializer implements IVersionedSerializer<StreamHeader>
- {
- public void serialize(StreamHeader sh, DataOutput out, int version) throws IOException
- {
- out.writeUTF(sh.table);
- UUIDSerializer.serializer.serialize(sh.sessionId, out, MessagingService.current_version);
- PendingFile.serializer.serialize(sh.file, out, version);
- out.writeInt(sh.pendingFiles.size());
- for (PendingFile file : sh.pendingFiles)
- PendingFile.serializer.serialize(file, out, version);
- }
-
- public StreamHeader deserialize(DataInput in, int version) throws IOException
- {
- String table = in.readUTF();
- UUID sessionId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
- PendingFile file = PendingFile.serializer.deserialize(in, version);
- int size = in.readInt();
-
- List<PendingFile> pendingFiles = new ArrayList<PendingFile>(size);
- for (int i = 0; i < size; i++)
- pendingFiles.add(PendingFile.serializer.deserialize(in, version));
- return new StreamHeader(table, sessionId, file, pendingFiles);
- }
-
- public long serializedSize(StreamHeader sh, int version)
- {
- long size = TypeSizes.NATIVE.sizeof(sh.table);
- size += TypeSizes.NATIVE.sizeof(sh.sessionId);
- size += PendingFile.serializer.serializedSize(sh.file, version);
- size += TypeSizes.NATIVE.sizeof(sh.pendingFiles.size());
- for (PendingFile file : sh.pendingFiles)
- size += PendingFile.serializer.serializedSize(file, version);
- return size;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamIn.java b/src/java/org/apache/cassandra/streaming/StreamIn.java
deleted file mode 100644
index 588728c..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamIn.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.net.InetAddress;
-import java.util.Collection;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * for streaming data from other nodes in to this one.
- * Sends a STREAM_REQUEST Message to the source node(s), after which StreamOut on that side takes over.
- * See StreamOut for details.
- */
-public class StreamIn
-{
- private static final Logger logger = LoggerFactory.getLogger(StreamIn.class);
-
- /** Request ranges for all column families in the given keyspace. */
- public static void requestRanges(InetAddress source, String tableName, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
- {
- requestRanges(source, tableName, Table.open(tableName).getColumnFamilyStores(), ranges, callback, type);
- }
-
- /**
- * Request ranges to be transferred from specific CFs
- */
- public static void requestRanges(InetAddress source, String tableName, Collection<ColumnFamilyStore> columnFamilies, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
- {
- assert ranges.size() > 0;
-
- if (logger.isDebugEnabled())
- logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", "));
- StreamInSession session = StreamInSession.create(source, callback);
- StreamRequest srm = new StreamRequest(FBUtilities.getBroadcastAddress(),
- ranges,
- tableName,
- columnFamilies,
- session.getSessionId(),
- type);
- MessagingService.instance().sendOneWay(srm.createMessage(), source);
- }
-
- /** Translates remote files to local files by creating a local sstable per remote sstable. */
- public static PendingFile getContextMapping(PendingFile remote)
- {
- /* Create a local sstable for each remote sstable */
- Descriptor remotedesc = remote.desc;
- if (!remotedesc.isStreamCompatible())
- throw new UnsupportedOperationException(String.format("SSTable %s is not compatible with current version %s",
- remote.getFilename(), Descriptor.Version.CURRENT));
-
- // new local sstable
- Table table = Table.open(remotedesc.ksname);
- ColumnFamilyStore cfStore = table.getColumnFamilyStore(remotedesc.cfname);
- Directories.DataDirectory localDir = cfStore.directories.getLocationCapableOfSize(remote.size);
- if (localDir == null)
- throw new RuntimeException("Insufficient disk space to store " + remote.size + " bytes");
- Descriptor localdesc = Descriptor.fromFilename(cfStore.getTempSSTablePath(cfStore.directories.getLocationForDisk(localDir)));
-
- return new PendingFile(localdesc, remote);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
deleted file mode 100644
index 370183f..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.util.*;
-import java.util.concurrent.ConcurrentMap;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.UUIDGen;
-
-/** each context gets its own StreamInSession. So there may be >1 Session per host */
-public class StreamInSession extends AbstractStreamSession
-{
- private static final Logger logger = LoggerFactory.getLogger(StreamInSession.class);
-
- private static final ConcurrentMap<UUID, StreamInSession> sessions = new NonBlockingHashMap<UUID, StreamInSession>();
-
- private final Set<PendingFile> files = new NonBlockingHashSet<PendingFile>();
- private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
- private PendingFile current;
- private Socket socket;
- private volatile int retries;
-
- private StreamInSession(InetAddress host, UUID sessionId, IStreamCallback callback)
- {
- super(null, host, sessionId, callback);
- }
-
- public static StreamInSession create(InetAddress host, IStreamCallback callback)
- {
- StreamInSession session = new StreamInSession(host, UUIDGen.getTimeUUID(), callback);
- sessions.put(session.getSessionId(), session);
- return session;
- }
-
- public static StreamInSession get(InetAddress host, UUID sessionId)
- {
- StreamInSession session = sessions.get(sessionId);
- if (session == null)
- {
- StreamInSession possibleNew = new StreamInSession(host, sessionId, null);
- if ((session = sessions.putIfAbsent(sessionId, possibleNew)) == null)
- session = possibleNew;
- }
- return session;
- }
-
- public static boolean hasSession(UUID sessionId)
- {
- return sessions.get(sessionId) != null;
- }
-
- public void setCurrentFile(PendingFile file)
- {
- this.current = file;
- }
-
- public void setTable(String table)
- {
- this.table = table;
- }
-
- public void setSocket(Socket socket)
- {
- this.socket = socket;
- }
-
- public void addFiles(Collection<PendingFile> files)
- {
- for (PendingFile file : files)
- {
- if(logger.isDebugEnabled())
- logger.debug("Adding file {} to Stream Request queue", file.getFilename());
- this.files.add(file);
- }
- }
-
- public void finished(PendingFile remoteFile, SSTableReader reader) throws IOException
- {
- if (logger.isDebugEnabled())
- logger.debug("Finished {} (from {}). Sending ack to {}", new Object[] {remoteFile, getHost(), this});
-
- assert reader != null;
- readers.add(reader);
- files.remove(remoteFile);
- if (remoteFile.equals(current))
- current = null;
- StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_FINISHED);
- // send a StreamStatus message telling the source node it can delete this file
- sendMessage(reply.createMessage());
- logger.debug("ack {} sent for {}", reply, remoteFile);
- }
-
- public void retry(PendingFile remoteFile)
- {
- retries++;
- if (retries > DatabaseDescriptor.getMaxStreamingRetries())
- {
- logger.error(String.format("Failed streaming session %s from %s while receiving %s", getSessionId(), getHost().toString(), current),
- new IllegalStateException("Too many retries for " + remoteFile));
- close(false);
- return;
- }
- StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_RETRY);
- logger.info("Streaming of file {} for {} failed: requesting a retry.", remoteFile, this);
- try
- {
- sendMessage(reply.createMessage());
- }
- catch (IOException e)
- {
- logger.error("Sending retry message failed, closing session.", e);
- close(false);
- }
- }
-
- public void sendMessage(MessageOut<StreamReply> message) throws IOException
- {
- DataOutputStream out = new DataOutputStream(socket.getOutputStream());
- FileStreamTask.sendReply(message,
- out);
- out.flush();
- }
-
- public void closeIfFinished() throws IOException
- {
- if (files.isEmpty())
- {
- HashMap <ColumnFamilyStore, List<SSTableReader>> cfstores = new HashMap<ColumnFamilyStore, List<SSTableReader>>();
- try
- {
- for (SSTableReader sstable : readers)
- {
- assert sstable.getTableName().equals(table);
-
- // Acquire the reference (for secondary index building) before submitting the index build,
- // so it can't get compacted out of existence in between
- if (!sstable.acquireReference())
- throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred");
-
- ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName());
- if (!cfstores.containsKey(cfs))
- cfstores.put(cfs, new ArrayList<SSTableReader>());
- cfstores.get(cfs).add(sstable);
- }
-
- // add sstables and build secondary indexes
- for (Map.Entry<ColumnFamilyStore, List<SSTableReader>> entry : cfstores.entrySet())
- {
- if (entry.getKey() != null)
- {
- entry.getKey().addSSTables(entry.getValue());
- entry.getKey().indexManager.maybeBuildSecondaryIndexes(entry.getValue(), entry.getKey().indexManager.allIndexesNames());
- }
- }
- }
- finally
- {
- for (List<SSTableReader> referenced : cfstores.values())
- SSTableReader.releaseReferences(referenced);
- }
-
- // send reply to source that we're done
- StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FINISHED);
- logger.info("Finished streaming session {} from {}", getSessionId(), getHost());
- try
- {
- if (socket != null)
- FileStreamTask.sendReply(reply.createMessage(),
- new DataOutputStream(socket.getOutputStream()));
- else
- logger.debug("No socket to reply to {} with!", getHost());
- }
- finally
- {
- if (socket != null)
- socket.close();
- }
-
- close(true);
- }
- }
-
- protected void closeInternal(boolean success)
- {
- sessions.remove(sessionId);
- if (!success && FailureDetector.instance.isAlive(getHost()))
- {
- StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FAILURE);
- MessagingService.instance().sendOneWay(reply.createMessage(), getHost());
- }
- }
-
- /** query method to determine which hosts are streaming to this node. */
- public static Set<InetAddress> getSources()
- {
- HashSet<InetAddress> set = new HashSet<InetAddress>();
- for (StreamInSession session : sessions.values())
- {
- set.add(session.getHost());
- }
- return set;
- }
-
- /** query the status of incoming files. */
- public static Set<PendingFile> getIncomingFiles(InetAddress host)
- {
- Set<PendingFile> set = new HashSet<PendingFile>();
- for (Map.Entry<UUID, StreamInSession> entry : sessions.entrySet())
- {
- StreamInSession session = entry.getValue();
- if (session.getHost().equals(host))
- {
- if (session.current != null)
- set.add(session.current);
- set.addAll(session.files);
- }
- }
- return set;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
new file mode 100644
index 0000000..d64cc65
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+/**
+ * StreamManager manages currently running {@link StreamResultFuture}s and provides status of all operation invoked.
+ *
+ * All stream operation should be created through this class to track streaming status and progress.
+ */
+public class StreamManager implements StreamManagerMBean
+{
+ public static final StreamManager instance = new StreamManager();
+
+ private static final RateLimiter limiter = RateLimiter.create(Double.MAX_VALUE);
+
+ /**
+ * Gets streaming rate limiter.
+ * When stream_throughput_outbound_megabits_per_sec is 0, this returns rate limiter
+ * with the rate of Double.MAX_VALUE bytes per second.
+ * Rate unit is bytes per sec.
+ *
+ * @return RateLimiter with rate limit set
+ */
+ public static RateLimiter getRateLimiter()
+ {
+ double currentThroughput = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * 1024 * 1024 / 8 / 1000;
+ // if throughput is set to 0, throttling is disabled
+ if (currentThroughput == 0)
+ currentThroughput = Double.MAX_VALUE;
+ if (limiter.getRate() != currentThroughput)
+ limiter.setRate(currentThroughput);
+ return limiter;
+ }
+
+ /** Currently running stream plans. Removed after completion/failure. */
+ private final Map<UUID, StreamResultFuture> currentStreams = new NonBlockingHashMap<>();
+
+ public Set<StreamState> getCurrentStreams()
+ {
+ return Sets.newHashSet(Iterables.transform(currentStreams.values(), new Function<StreamResultFuture, StreamState>()
+ {
+ public StreamState apply(StreamResultFuture input)
+ {
+ return input.getCurrentState();
+ }
+ }));
+ }
+
+ public void register(final StreamResultFuture result)
+ {
+ // Make sure we remove the stream on completion (whether successful or not)
+ result.addListener(new Runnable()
+ {
+ public void run()
+ {
+ currentStreams.remove(result.planId);
+ }
+ }, MoreExecutors.sameThreadExecutor());
+
+ currentStreams.put(result.planId, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
new file mode 100644
index 0000000..eb6f6ae
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+public interface StreamManagerMBean
+{
+ public static final String OBJECT_NAME = "org.apache.cassandra.net:type=StreamManager";
+
+ /**
+ * Returns the current state of all ongoing streams.
+ */
+ Set<StreamState> getCurrentStreams();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java
deleted file mode 100644
index 709741b..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamOut.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.Future;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.streaming.compress.CompressionInfo;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * This class handles streaming data from one node to another.
- *
- * The source node [the Out side] is always in charge of the streaming session. Streams may
- * be initiated either directly by the source via the methods in this class,
- * or on demand from the target (via StreamRequest).
- *
- * Files to stream are grouped into sessions, which can have callbacks associated
- * with them so that (for instance) we can mark a new node a full member of the
- * cluster after all the data it needs has been streamed.
- *
- * The source begins a session by sending
- * a Message with the stream bit flag in the Header turned on. Part of that Message
- * will include a StreamHeader that includes the files that will be streamed as part
- * of that session, as well as the first file-to-be-streamed. (Combining session list
- * and first file like this is inconvenient, but not as inconvenient as the old
- * three-part send-file-list, wait-for-ack, start-first-file dance.)
- *
- * This is done over a separate TCP connection to avoid blocking ordinary intra-node
- * traffic during the stream. So there is no Handler for the main stream of data --
- * when a connection sets the Stream bit, IncomingTcpConnection knows what to expect
- * without any further Messages.
- *
- * After each file, the target node [the In side] will send a StreamReply indicating success
- * (FILE_FINISHED) or failure (FILE_RETRY).
- *
- * When all files have been successfully transferred and integrated the target will
- * send an additional SESSION_FINISHED reply and the session is complete.
- *
- * For Stream requests (for bootstrap), one subtlety is that we always have to
- * create at least one stream reply, even if the list of files is empty, otherwise the
- * target has no way to know that it can stop waiting for an answer.
- *
- */
-public class StreamOut
-{
- private static final Logger logger = LoggerFactory.getLogger(StreamOut.class);
-
- /**
- * Stream the given ranges to the target endpoint from each CF in the given keyspace.
- */
- public static void transferRanges(InetAddress target, Table table, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
- {
- transferRanges(target, table, table.getColumnFamilyStores(), ranges, callback, type);
- }
-
- /**
- * Stream the given ranges to the target endpoint for provided CFs in the given keyspace.
- */
- public static void transferRanges(InetAddress target, Table table, Iterable<ColumnFamilyStore> cfses, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
- {
- StreamOutSession session = StreamOutSession.create(table.getName(), target, callback);
- transferRanges(session, cfses, ranges, type);
- }
-
- /**
- * Flushes matching column families from the given keyspace, or all columnFamilies
- * if the cf list is empty.
- */
- private static void flushSSTables(Iterable<ColumnFamilyStore> stores)
- {
- logger.info("Flushing memtables for {}...", stores);
- List<Future<?>> flushes = new ArrayList<Future<?>>();
- for (ColumnFamilyStore cfstore : stores)
- flushes.add(cfstore.forceFlush());
- FBUtilities.waitOnFutures(flushes);
- }
-
- /**
- * Stream the given ranges to the target endpoint from each of the given CFs.
- */
- public static void transferRanges(StreamOutSession session, Iterable<ColumnFamilyStore> cfses, Collection<Range<Token>> ranges, OperationType type)
- {
- transferRanges(session, cfses, ranges, type, true);
- }
-
- /**
- * Stream the given ranges to the target endpoint from each of the given CFs.
- */
- public static void transferRanges(StreamOutSession session,
- Iterable<ColumnFamilyStore> cfses,
- Collection<Range<Token>> ranges,
- OperationType type,
- boolean flushTables)
- {
- assert ranges.size() > 0;
- logger.info("Beginning transfer to {}", session.getHost());
- logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
-
- if (flushTables)
- flushSSTables(cfses);
-
- List<SSTableReader> sstables = Lists.newLinkedList();
- for (ColumnFamilyStore cfStore : cfses)
- {
- List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
- for (Range<Token> range : ranges)
- rowBoundsList.add(range.toRowBounds());
- ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
- sstables.addAll(view.sstables);
- }
-
- transferSSTables(session, sstables, ranges, type);
- }
-
- /**
- * Low-level transfer of matching portions of a group of sstables from a single table to the target endpoint.
- * You should probably call transferRanges instead. This moreover assumes that references have been acquired on the sstables.
- */
- public static void transferSSTables(StreamOutSession session, Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, OperationType type)
- {
- List<PendingFile> pending = createPendingFiles(sstables, ranges, type);
-
- // Even if the list of pending files is empty, we need to initiate the transfer otherwise
- // the remote end will hang in cases where this was a requested transfer.
- session.addFilesToStream(pending);
- session.begin();
- }
-
- // called prior to sending anything.
- private static List<PendingFile> createPendingFiles(Iterable<SSTableReader> sstables, Collection<Range<Token>> ranges, OperationType type)
- {
- List<PendingFile> pending = new ArrayList<PendingFile>();
- for (SSTableReader sstable : sstables)
- {
- Descriptor desc = sstable.descriptor;
- List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
- if (sections.isEmpty())
- {
- // A reference was acquired on the sstable and we won't stream it
- sstable.releaseReference();
- continue;
- }
- CompressionInfo compression = null;
- if (sstable.compression)
- {
- compression = new CompressionInfo(sstable.getCompressionMetadata().getChunksForSections(sections),
- sstable.getCompressionMetadata().parameters);
- }
- pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type, sstable.estimatedKeysForRanges(ranges), compression));
- }
- logger.info("Stream context metadata {}, {} sstables.", pending, Iterables.size(sstables));
- return pending;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamOutSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOutSession.java b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
deleted file mode 100644
index edc07ca..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamOutSession.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.UUIDGen;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
-/**
- * This class manages the streaming of multiple files one after the other.
- */
-public class StreamOutSession extends AbstractStreamSession
-{
- private static final Logger logger = LoggerFactory.getLogger(StreamOutSession.class);
-
- // one host may have multiple stream sessions.
- private static final ConcurrentMap<UUID, StreamOutSession> streams = new NonBlockingHashMap<UUID, StreamOutSession>();
-
- public static StreamOutSession create(String table, InetAddress host, IStreamCallback callback)
- {
- return create(table, host, UUIDGen.getTimeUUID(), callback);
- }
-
- public static StreamOutSession create(String table, InetAddress host, UUID sessionId)
- {
- return create(table, host, sessionId, null);
- }
-
- public static StreamOutSession create(String table, InetAddress host, UUID sessionId, IStreamCallback callback)
- {
- StreamOutSession session = new StreamOutSession(table, host, sessionId, callback);
- streams.put(sessionId, session);
- return session;
- }
-
- public static StreamOutSession get(UUID sessionId)
- {
- return streams.get(sessionId);
- }
-
- private final Map<String, PendingFile> files = new NonBlockingHashMap<String, PendingFile>();
-
- private volatile String currentFile;
-
- private StreamOutSession(String table, InetAddress host, UUID sessionId, IStreamCallback callback)
- {
- super(table, host, sessionId, callback);
- }
-
- public void addFilesToStream(List<PendingFile> pendingFiles)
- {
- for (PendingFile pendingFile : pendingFiles)
- {
- if (logger.isDebugEnabled())
- logger.debug("Adding file {} to be streamed.", pendingFile.getFilename());
- files.put(pendingFile.getFilename(), pendingFile);
- }
- }
-
- public void retry()
- {
- streamFile(files.get(currentFile));
- }
-
- private void streamFile(PendingFile pf)
- {
- if (logger.isDebugEnabled())
- logger.debug("Streaming {} ...", pf);
- currentFile = pf.getFilename();
- MessagingService.instance().stream(new StreamHeader(table, getSessionId(), pf), getHost());
- }
-
- public void startNext()
- {
- assert files.containsKey(currentFile);
- files.get(currentFile).sstable.releaseReference();
- files.remove(currentFile);
- Iterator<PendingFile> iter = files.values().iterator();
- if (iter.hasNext())
- streamFile(iter.next());
- }
-
- protected void closeInternal(boolean success)
- {
- // Release reference on last file (or any uncompleted ones)
- for (PendingFile file : files.values())
- file.sstable.releaseReference();
- streams.remove(sessionId);
- }
-
- /** convenience method for use when testing */
- void await() throws InterruptedException
- {
- while (streams.containsKey(sessionId))
- Thread.sleep(10);
- }
-
- public Collection<PendingFile> getFiles()
- {
- return files.values();
- }
-
- public static Set<InetAddress> getDestinations()
- {
- Set<InetAddress> hosts = new HashSet<InetAddress>();
- for (StreamOutSession session : streams.values())
- {
- hosts.add(session.getHost());
- }
- return hosts;
- }
-
- public static List<PendingFile> getOutgoingFiles(InetAddress host)
- {
- List<PendingFile> list = new ArrayList<PendingFile>();
- for (Map.Entry<UUID, StreamOutSession> entry : streams.entrySet())
- {
- StreamOutSession session = entry.getValue();
- if (session.getHost().equals(host))
- list.addAll(session.getFiles());
- }
- return list;
- }
-
- public void validateCurrentFile(String file)
- {
- if (!file.equals(currentFile))
- throw new IllegalStateException(String.format("target reports current file is %s but is %s", file, currentFile));
- }
-
- public void begin()
- {
- PendingFile first = files.isEmpty() ? null : files.values().iterator().next();
- currentFile = first == null ? null : first.getFilename();
- StreamHeader header = new StreamHeader(table, getSessionId(), first, files.values());
- logger.info("Streaming to {}", getHost());
- logger.debug("Files are {}", StringUtils.join(files.values(), ","));
- MessagingService.instance().stream(header, getHost());
- }
-}