You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/20 19:07:14 UTC
[2/5] Streaming 2.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingService.java b/src/java/org/apache/cassandra/streaming/StreamingService.java
deleted file mode 100644
index ea22f82..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamingService.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-
-public class StreamingService implements StreamingServiceMBean
-{
- public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.net:type=StreamingService";
- public static final StreamingService instance = new StreamingService();
-
- private StreamingService()
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName(MBEAN_OBJECT_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public String getStatus()
- {
- StringBuilder sb = new StringBuilder();
- sb.append("Receiving from:\n");
- for (InetAddress source : StreamInSession.getSources())
- {
- sb.append(String.format(" %s:%n", source.getHostAddress()));
- for (PendingFile pf : StreamInSession.getIncomingFiles(source))
- {
- sb.append(String.format(" %s%n", pf.toString()));
- }
- }
- sb.append("Sending to:%n");
- for (InetAddress dest : StreamOutSession.getDestinations())
- {
- sb.append(String.format(" %s:%n", dest.getHostAddress()));
- for (PendingFile pf : StreamOutSession.getOutgoingFiles(dest))
- {
- sb.append(String.format(" %s%n", pf.toString()));
- }
- }
- return sb.toString();
- }
-
- /** hosts receiving outgoing streams. */
- public Set<InetAddress> getStreamDestinations()
- {
- return StreamOutSession.getDestinations();
- }
-
- /** outgoing streams */
- public List<String> getOutgoingFiles(String host) throws IOException
- {
- List<String> files = new ArrayList<String>();
- // first, verify that host is a destination. calling StreamOutManager.get will put it in the collection
- // leading to false positives in the future.
- Set<InetAddress> existingDestinations = getStreamDestinations();
- InetAddress dest = InetAddress.getByName(host);
- if (!existingDestinations.contains(dest))
- return files;
-
- for (PendingFile f : StreamOutSession.getOutgoingFiles(dest))
- files.add(String.format("%s", f.toString()));
- return files;
- }
-
- /** hosts sending incoming streams */
- public Set<InetAddress> getStreamSources()
- {
- return StreamInSession.getSources();
- }
-
- /** details about incoming streams. */
- public List<String> getIncomingFiles(String host) throws IOException
- {
- List<String> files = new ArrayList<String>();
- for (PendingFile pf : StreamInSession.getIncomingFiles(InetAddress.getByName(host)))
- {
- files.add(String.format("%s: %s", pf.desc.ksname, pf.toString()));
- }
- return files;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java b/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
deleted file mode 100644
index 8d748f3..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamingServiceMBean.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.Set;
-
-public interface StreamingServiceMBean
-{
- /** hosts recieving outgoing streams */
- public Set<InetAddress> getStreamDestinations();
-
- /** outgoing streams */
- public List<String> getOutgoingFiles(String host) throws IOException;
-
- /** hosts sending incoming streams. */
- public Set<InetAddress> getStreamSources();
-
- /** details about incoming streams */
- public List<String> getIncomingFiles(String host) throws IOException;
-
- /** What's currently happening wrt streaming. */
- public String getStatus();
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java b/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
deleted file mode 100644
index 07fb765..0000000
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.compress;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.metrics.StreamingMetrics;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.streaming.FileStreamTask;
-import org.apache.cassandra.streaming.StreamHeader;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * FileStreamTask for compressed SSTable.
- *
- * This class sends relevant part of compressed file directly using nio if available.
- */
-public class CompressedFileStreamTask extends FileStreamTask
-{
- private static final Logger logger = LoggerFactory.getLogger(CompressedFileStreamTask.class);
- // 10MB chunks
- public static final int CHUNK_SIZE = 10*1024*1024;
-
- public CompressedFileStreamTask(StreamHeader header, InetAddress to)
- {
- super(header, to);
- }
-
- protected void stream() throws IOException
- {
- assert header.file.compressionInfo != null;
-
- SocketChannel sc = socket.getChannel();
- byte[] transferBuffer = null;
-
- // write header
- ByteBuffer headerBuffer = MessagingService.instance().constructStreamHeader(header, false, MessagingService.instance().getVersion(to));
- socket.getOutputStream().write(ByteBufferUtil.getArray(headerBuffer));
-
- RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename()));
- FileChannel fc = file.getChannel();
-
- StreamingMetrics.activeStreamsOutbound.inc();
- // calculate chunks to transfer. we want to send continuous chunks altogether.
- List<Pair<Long, Long>> sections = getTransferSections(header.file.compressionInfo.chunks);
- try
- {
- long totalBytesTransferred = 0;
- // stream each of the required sections of the file
- for (Pair<Long, Long> section : sections)
- {
- // seek to the beginning of the section when socket channel is not available
- if (sc == null)
- file.seek(section.left);
- // length of the section to stream
- long length = section.right - section.left;
- // tracks write progress
- long bytesTransferred = 0;
- while (bytesTransferred < length)
- {
- int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
- long lastWrite;
- if (sc != null)
- {
- lastWrite = fc.transferTo(section.left + bytesTransferred, toTransfer, sc);
- throttle.throttleDelta(lastWrite);
- }
- else
- {
- // NIO is not available. Fall back to normal streaming.
- // This happens when inter-node encryption is turned on.
- if (transferBuffer == null)
- transferBuffer = new byte[CHUNK_SIZE];
- file.readFully(transferBuffer, 0, toTransfer);
- socket.getOutputStream().write(transferBuffer, 0, toTransfer);
- throttle.throttleDelta(toTransfer);
- lastWrite = toTransfer;
- }
- totalBytesTransferred += lastWrite;
- bytesTransferred += lastWrite;
- header.file.progress += lastWrite;
- }
-
- if (sc == null)
- socket.getOutputStream().flush();
-
- logger.debug("Bytes transferred " + bytesTransferred + "/" + header.file.size);
- }
- StreamingMetrics.totalOutgoingBytes.inc(totalBytesTransferred);
- metrics.outgoingBytes.inc(totalBytesTransferred);
- // receive reply confirmation
- receiveReply();
- }
- finally
- {
- StreamingMetrics.activeStreamsOutbound.dec();
-
- // no matter what happens close file
- FileUtils.closeQuietly(file);
- }
- }
-
- // chunks are assumed to be sorted by offset
- private List<Pair<Long, Long>> getTransferSections(CompressionMetadata.Chunk[] chunks)
- {
- List<Pair<Long, Long>> transferSections = new ArrayList<Pair<Long, Long>>();
- Pair<Long, Long> lastSection = null;
- for (CompressionMetadata.Chunk chunk : chunks)
- {
- if (lastSection != null)
- {
- if (chunk.offset == lastSection.right)
- {
- // extend previous section to end of this chunk
- lastSection = Pair.create(lastSection.left, chunk.offset + chunk.length + 4); // 4 bytes for CRC
- }
- else
- {
- transferSections.add(lastSection);
- lastSection = Pair.create(chunk.offset, chunk.offset + chunk.length + 4);
- }
- }
- else
- {
- lastSection = Pair.create(chunk.offset, chunk.offset + chunk.length + 4);
- }
- }
- if (lastSection != null)
- transferSections.add(lastSection);
- return transferSections;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 6fe1793..3305f50 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -17,9 +17,12 @@
*/
package org.apache.cassandra.streaming.compress;
-import java.io.*;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.Iterator;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
new file mode 100644
index 0000000..f3e511b
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.compress;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamReader;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.utils.BytesReadTracker;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * StreamReader that reads from streamed compressed SSTable
+ */
+public class CompressedStreamReader extends StreamReader
+{
+ protected final CompressionInfo compressionInfo;
+
+ public CompressedStreamReader(FileMessageHeader header, StreamSession session)
+ {
+ super(header, session);
+ this.compressionInfo = header.compressionInfo;
+ }
+
+ /**
+ * @return SSTable transferred
+ * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
+ */
+ @Override
+ public SSTableReader read(ReadableByteChannel channel) throws IOException
+ {
+ long totalSize = totalSize();
+ CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
+
+ Pair<String, String> kscf = Schema.instance.getCF(cfId);
+ ColumnFamilyStore cfs = Table.open(kscf.left).getColumnFamilyStore(kscf.right);
+ Directories.DataDirectory localDir = cfs.directories.getLocationCapableOfSize(totalSize);
+ if (localDir == null)
+ throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
+ desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
+
+ SSTableWriter writer = new SSTableWriter(desc.filenameFor(Component.DATA), estimatedKeys);
+ try
+ {
+ BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
+
+ for (Pair<Long, Long> section : sections)
+ {
+ long length = section.right - section.left;
+ // skip to beginning of section inside chunk
+ cis.position(section.left);
+ in.reset(0);
+ while (in.getBytesRead() < length)
+ {
+ writeRow(writer, in, cfs);
+ // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
+ session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
+ }
+ }
+ return writer.closeAndOpenReader();
+ }
+ catch (Throwable e)
+ {
+ writer.abort();
+ if (e instanceof IOException)
+ throw (IOException) e;
+ else
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ protected long totalSize()
+ {
+ long size = 0;
+ // calculate total length of transferring chunks
+ for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
+ size += chunk.length + 4; // 4 bytes for CRC
+ return size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
new file mode 100644
index 0000000..80fcef5
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.compress;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamWriter;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * StreamWriter for compressed SSTable.
+ */
+public class CompressedStreamWriter extends StreamWriter
+{
+ public static final int CHUNK_SIZE = 10 * 1024 * 1024;
+
+ private final CompressionInfo compressionInfo;
+
+ public CompressedStreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, CompressionInfo compressionInfo, StreamSession session)
+ {
+ super(sstable, sections, session);
+ this.compressionInfo = compressionInfo;
+ }
+
+ @Override
+ public void write(WritableByteChannel channel) throws IOException
+ {
+ long totalSize = totalSize();
+ RandomAccessReader file = sstable.openDataReader();
+ FileChannel fc = file.getChannel();
+
+ long progress = 0L;
+ // calculate chunks to transfer. we want to send continuous chunks altogether.
+ List<Pair<Long, Long>> sections = getTransferSections(compressionInfo.chunks);
+ try
+ {
+ // stream each of the required sections of the file
+ for (Pair<Long, Long> section : sections)
+ {
+ // length of the section to stream
+ long length = section.right - section.left;
+ // tracks write progress
+ long bytesTransferred = 0;
+ while (bytesTransferred < length)
+ {
+ int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred);
+ limiter.acquire(toTransfer);
+ long lastWrite = fc.transferTo(section.left + bytesTransferred, toTransfer, channel);
+ bytesTransferred += lastWrite;
+ progress += lastWrite;
+ session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
+ }
+ }
+ }
+ finally
+ {
+ // no matter what happens close file
+ FileUtils.closeQuietly(file);
+ }
+
+ sstable.releaseReference();
+ }
+
+ @Override
+ protected long totalSize()
+ {
+ long size = 0;
+ // calculate total length of transferring chunks
+ for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
+ size += chunk.length + 4; // 4 bytes for CRC
+ return size;
+ }
+
+ // chunks are assumed to be sorted by offset
+ private List<Pair<Long, Long>> getTransferSections(CompressionMetadata.Chunk[] chunks)
+ {
+ List<Pair<Long, Long>> transferSections = new ArrayList<>();
+ Pair<Long, Long> lastSection = null;
+ for (CompressionMetadata.Chunk chunk : chunks)
+ {
+ if (lastSection != null)
+ {
+ if (chunk.offset == lastSection.right)
+ {
+ // extend previous section to end of this chunk
+ lastSection = Pair.create(lastSection.left, chunk.offset + chunk.length + 4); // 4 bytes for CRC
+ }
+ else
+ {
+ transferSections.add(lastSection);
+ lastSection = Pair.create(chunk.offset, chunk.offset + chunk.length + 4);
+ }
+ }
+ else
+ {
+ lastSection = Pair.create(chunk.offset, chunk.offset + chunk.length + 4);
+ }
+ }
+ if (lastSection != null)
+ transferSections.add(lastSection);
+ return transferSections;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
new file mode 100644
index 0000000..b561abc
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/CompleteMessage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.cassandra.streaming.StreamSession;
+
+public class CompleteMessage extends StreamMessage
+{
+ public static Serializer<CompleteMessage> serializer = new Serializer<CompleteMessage>()
+ {
+ public CompleteMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+ {
+ return new CompleteMessage();
+ }
+
+ public void serialize(CompleteMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException {}
+ };
+
+ public CompleteMessage()
+ {
+ super(Type.COMPLETE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/FileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessage.java b/src/java/org/apache/cassandra/streaming/messages/FileMessage.java
new file mode 100644
index 0000000..fe05eac
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessage.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.streaming.StreamReader;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamWriter;
+import org.apache.cassandra.streaming.compress.CompressedStreamReader;
+import org.apache.cassandra.streaming.compress.CompressedStreamWriter;
+import org.apache.cassandra.streaming.compress.CompressionInfo;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * FileMessage is used to transfer/receive the part(or whole) of a SSTable data file.
+ */
+public class FileMessage extends StreamMessage
+{
+ public static Serializer<FileMessage> serializer = new Serializer<FileMessage>()
+ {
+ public FileMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+ {
+ DataInputStream input = new DataInputStream(Channels.newInputStream(in));
+ FileMessageHeader header = FileMessageHeader.serializer.deserialize(input, version);
+ StreamReader reader = header.compressionInfo == null ? new StreamReader(header, session)
+ : new CompressedStreamReader(header, session);
+
+ try
+ {
+ return new FileMessage(reader.read(in), header);
+ }
+ catch (Throwable e)
+ {
+ session.doRetry(header, e);
+ return null;
+ }
+ }
+
+ public void serialize(FileMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException
+ {
+ DataOutput output = new DataOutputStream(Channels.newOutputStream(out));
+ FileMessageHeader.serializer.serialize(message.header, output, version);
+ StreamWriter writer = message.header.compressionInfo == null ?
+ new StreamWriter(message.sstable, message.header.sections, session) :
+ new CompressedStreamWriter(message.sstable,
+ message.header.sections,
+ message.header.compressionInfo, session);
+ writer.write(out);
+ session.fileSent(message.header);
+ }
+ };
+
+ public final FileMessageHeader header;
+ public final SSTableReader sstable;
+
+ public FileMessage(SSTableReader sstable, FileMessageHeader header)
+ {
+ super(Type.FILE);
+ this.header = header;
+ this.sstable = sstable;
+ }
+
+ public FileMessage(SSTableReader sstable, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections)
+ {
+ super(Type.FILE);
+ this.sstable = sstable;
+
+ CompressionInfo compressionInfo = null;
+ if (sstable.compression)
+ {
+ CompressionMetadata meta = sstable.getCompressionMetadata();
+ compressionInfo = new CompressionInfo(meta.getChunksForSections(sections), meta.parameters);
+ }
+ this.header = new FileMessageHeader(sstable.metadata.cfId,
+ sequenceNumber,
+ sstable.descriptor.version.toString(),
+ estimatedKeys,
+ sections,
+ compressionInfo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
new file mode 100644
index 0000000..761e086
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.compress.CompressionInfo;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+/**
+ * StreamingFileHeader is appended before sending actual data to describe what it's sending.
+ */
+public class FileMessageHeader
+{
+ public static IVersionedSerializer<FileMessageHeader> serializer = new FileMessageHeaderSerializer();
+
+ public final UUID cfId;
+ public final int sequenceNumber;
+ /** SSTable version */
+ public final String version;
+ public final long estimatedKeys;
+ public final List<Pair<Long, Long>> sections;
+ public final CompressionInfo compressionInfo;
+
+ public FileMessageHeader(UUID cfId,
+ int sequenceNumber,
+ String version,
+ long estimatedKeys,
+ List<Pair<Long, Long>> sections,
+ CompressionInfo compressionInfo)
+ {
+ this.cfId = cfId;
+ this.sequenceNumber = sequenceNumber;
+ this.version = version;
+ this.estimatedKeys = estimatedKeys;
+ this.sections = sections;
+ this.compressionInfo = compressionInfo;
+ }
+
+ /**
+ * @return total file size to transfer in bytes
+ */
+ public long size()
+ {
+ long size = 0;
+ if (compressionInfo != null)
+ {
+ // calculate total length of transferring chunks
+ for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
+ size += chunk.length + 4; // 4 bytes for CRC
+ }
+ else
+ {
+ for (Pair<Long, Long> section : sections)
+ size += section.right - section.left;
+ }
+ return size;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ FileMessageHeader that = (FileMessageHeader) o;
+ return sequenceNumber == that.sequenceNumber && cfId.equals(that.cfId);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = cfId.hashCode();
+ result = 31 * result + sequenceNumber;
+ return result;
+ }
+
+ static class FileMessageHeaderSerializer implements IVersionedSerializer<FileMessageHeader>
+ {
+ public void serialize(FileMessageHeader header, DataOutput out, int version) throws IOException
+ {
+ UUIDSerializer.serializer.serialize(header.cfId, out, version);
+ out.writeInt(header.sequenceNumber);
+ out.writeUTF(header.version);
+ out.writeLong(header.estimatedKeys);
+
+ out.writeInt(header.sections.size());
+ for (Pair<Long, Long> section : header.sections)
+ {
+ out.writeLong(section.left);
+ out.writeLong(section.right);
+ }
+ CompressionInfo.serializer.serialize(header.compressionInfo, out, version);
+ }
+
+ public FileMessageHeader deserialize(DataInput in, int version) throws IOException
+ {
+ UUID cfId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+ int sequenceNumber = in.readInt();
+ String sstableVersion = in.readUTF();
+ long estimatedKeys = in.readLong();
+ int count = in.readInt();
+ List<Pair<Long, Long>> sections = new ArrayList<>(count);
+ for (int k = 0; k < count; k++)
+ sections.add(Pair.create(in.readLong(), in.readLong()));
+ CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, MessagingService.current_version);
+ return new FileMessageHeader(cfId, sequenceNumber, sstableVersion, estimatedKeys, sections, compressionInfo);
+ }
+
+ public long serializedSize(FileMessageHeader header, int version)
+ {
+ long size = UUIDSerializer.serializer.serializedSize(header.cfId, version);
+ size += TypeSizes.NATIVE.sizeof(header.sequenceNumber);
+ size += TypeSizes.NATIVE.sizeof(header.version);
+ size += TypeSizes.NATIVE.sizeof(header.estimatedKeys);
+
+ size += TypeSizes.NATIVE.sizeof(header.sections.size());
+ for (Pair<Long, Long> section : header.sections)
+ {
+ size += TypeSizes.NATIVE.sizeof(section.left);
+ size += TypeSizes.NATIVE.sizeof(section.right);
+ }
+ size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
+ return size;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
new file mode 100644
index 0000000..ba94592
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/PrepareMessage.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.*;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.cassandra.streaming.StreamRequest;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+
+public class PrepareMessage extends StreamMessage
+{
+ public static Serializer<PrepareMessage> serializer = new Serializer<PrepareMessage>()
+ {
+ public PrepareMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+ {
+ DataInput input = new DataInputStream(Channels.newInputStream(in));
+ PrepareMessage message = new PrepareMessage();
+ // requests
+ int numRequests = input.readInt();
+ for (int i = 0; i < numRequests; i++)
+ message.requests.add(StreamRequest.serializer.deserialize(input, version));
+ // summaries
+ int numSummaries = input.readInt();
+ for (int i = 0; i < numSummaries; i++)
+ message.summaries.add(StreamSummary.serializer.deserialize(input, version));
+ return message;
+ }
+
+ public void serialize(PrepareMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException
+ {
+ DataOutput output = new DataOutputStream(Channels.newOutputStream(out));
+ // requests
+ output.writeInt(message.requests.size());
+ for (StreamRequest request : message.requests)
+ StreamRequest.serializer.serialize(request, output, version);
+ // summaries
+ output.writeInt(message.summaries.size());
+ for (StreamSummary summary : message.summaries)
+ StreamSummary.serializer.serialize(summary, output, version);
+ }
+ };
+
+ /**
+ * Streaming requests
+ */
+ public final Collection<StreamRequest> requests = new ArrayList<>();
+
+ /**
+ * Summaries of streaming out
+ */
+ public final Collection<StreamSummary> summaries = new ArrayList<>();
+
+ public PrepareMessage()
+ {
+ super(Type.PREPARE);
+ }
+
+ @Override
+ public String toString()
+ {
+ final StringBuilder sb = new StringBuilder("PrepareMessage{");
+ sb.append(requests.size()).append(" requests, ");
+ int totalFile = 0;
+ for (StreamSummary summary : summaries)
+ totalFile += summary.files;
+ sb.append(totalFile).append(" files receiving");
+ sb.append('}');
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
new file mode 100644
index 0000000..67b4dee
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.*;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.UUID;
+
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+public class RetryMessage extends StreamMessage
+{
+ public static Serializer<RetryMessage> serializer = new Serializer<RetryMessage>()
+ {
+ public RetryMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+ {
+ DataInput input = new DataInputStream(Channels.newInputStream(in));
+ return new RetryMessage(UUIDSerializer.serializer.deserialize(input, MessagingService.current_version), input.readInt());
+ }
+
+ public void serialize(RetryMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException
+ {
+ DataOutput output = new DataOutputStream(Channels.newOutputStream(out));
+ UUIDSerializer.serializer.serialize(message.cfId, output, MessagingService.current_version);
+ output.writeInt(message.sequenceNumber);
+ }
+ };
+
+ public final UUID cfId;
+ public final int sequenceNumber;
+
+ public RetryMessage(UUID cfId, int sequenceNumber)
+ {
+ super(Type.RETRY);
+ this.cfId = cfId;
+ this.sequenceNumber = sequenceNumber;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
new file mode 100644
index 0000000..450f67d
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/SessionFailedMessage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.cassandra.streaming.StreamSession;
+
+public class SessionFailedMessage extends StreamMessage
+{
+ public static Serializer<SessionFailedMessage> serializer = new Serializer<SessionFailedMessage>()
+ {
+ public SessionFailedMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+ {
+ return new SessionFailedMessage();
+ }
+
+ public void serialize(SessionFailedMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException {}
+ };
+
+ public SessionFailedMessage()
+ {
+ super(Type.SESSION_FAILED);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
new file mode 100644
index 0000000..d1e797c
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+/**
+ * StreamInitMessage is first sent from the node where {@link org.apache.cassandra.streaming.StreamSession} is started,
+ * to initiate corresponding {@link org.apache.cassandra.streaming.StreamSession} on the other side.
+ */
+public class StreamInitMessage
+{
+ public static IVersionedSerializer<StreamInitMessage> serializer = new StreamInitMessageSerializer();
+
+ public final UUID planId;
+ public final String description;
+
+ public StreamInitMessage(UUID planId, String description)
+ {
+ this.planId = planId;
+ this.description = description;
+ }
+
+ /**
+ * Create serialized message.
+ *
+ * @param compress true if message is compressed
+ * @param version Streaming protocol version
+ * @return serialized message in ByteBuffer format
+ */
+ public ByteBuffer createMessage(boolean compress, int version)
+ {
+ int header = 0;
+ // set compression bit.
+ if (compress)
+ header |= 4;
+ // set streaming bit
+ header |= 8;
+ // Setting up the version bit
+ header |= (version << 8);
+
+ /* Adding the StreamHeader which contains the session Id along
+ * with the pendingfile info for the stream.
+ * | Session Id | Pending File Size | Pending File | Bool more files |
+ * | No. of Pending files | Pending Files ... |
+ */
+ byte[] bytes;
+ try
+ {
+ int size = (int)StreamInitMessage.serializer.serializedSize(this, version);
+ DataOutputBuffer buffer = new DataOutputBuffer(size);
+ StreamInitMessage.serializer.serialize(this, buffer, version);
+ bytes = buffer.getData();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ assert bytes.length > 0;
+
+ ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + bytes.length);
+ buffer.putInt(MessagingService.PROTOCOL_MAGIC);
+ buffer.putInt(header);
+ buffer.put(bytes);
+ buffer.flip();
+ return buffer;
+ }
+
+ private static class StreamInitMessageSerializer implements IVersionedSerializer<StreamInitMessage>
+ {
+ public void serialize(StreamInitMessage message, DataOutput out, int version) throws IOException
+ {
+ UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version);
+ out.writeUTF(message.description);
+ }
+
+ public StreamInitMessage deserialize(DataInput in, int version) throws IOException
+ {
+ UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+ return new StreamInitMessage(planId, in.readUTF());
+ }
+
+ public long serializedSize(StreamInitMessage message, int version)
+ {
+ long size = UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version);
+ size += TypeSizes.NATIVE.sizeof(message.description);
+ return size;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
new file mode 100644
index 0000000..f737675
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.cassandra.streaming.StreamSession;
+
+/**
+ * StreamMessage is an abstract base class that every messages in streaming protocol inherit.
+ *
+ * Every message carries message type({@link Type}) and streaming protocol version byte.
+ */
+public abstract class StreamMessage
+{
+ /** Streaming protocol version */
+ public static final int CURRENT_VERSION = 1;
+
+ public static void serialize(StreamMessage message, WritableByteChannel out, int version, StreamSession session) throws IOException
+ {
+ ByteBuffer buff = ByteBuffer.allocate(1);
+ // message type
+ buff.put(message.type.type);
+ buff.flip();
+ out.write(buff);
+ message.type.serializer.serialize(message, out, version, session);
+ }
+
+ public static StreamMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException
+ {
+ ByteBuffer buff = ByteBuffer.allocate(1);
+ in.read(buff);
+ buff.flip();
+ Type type = Type.get(buff.get());
+ return type.serializer.deserialize(in, version, session);
+ }
+
+ /** StreamMessage serializer */
+ public static interface Serializer<V extends StreamMessage>
+ {
+ V deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException;
+ void serialize(V message, WritableByteChannel out, int version, StreamSession session) throws IOException;
+ }
+
+ /** StreamMessage types */
+ public static enum Type
+ {
+ PREPARE(1, 5, PrepareMessage.serializer),
+ FILE(2, 0, FileMessage.serializer),
+ RETRY(3, 1, RetryMessage.serializer),
+ COMPLETE(4, 4, CompleteMessage.serializer),
+ SESSION_FAILED(5, 5, SessionFailedMessage.serializer);
+
+ public static Type get(byte type)
+ {
+ for (Type t : Type.values())
+ {
+ if (t.type == type)
+ return t;
+ }
+ throw new IllegalArgumentException("Unknown type " + type);
+ }
+
+ private final byte type;
+ public final int priority;
+ public final Serializer<StreamMessage> serializer;
+
+ @SuppressWarnings("unchecked")
+ private Type(int type, int priority, Serializer serializer)
+ {
+ this.type = (byte) type;
+ this.priority = priority;
+ this.serializer = serializer;
+ }
+ }
+
+ public final Type type;
+
+ protected StreamMessage(Type type)
+ {
+ this.type = type;
+ }
+
+ /**
+ * @return priority of this message. higher value, higher priority.
+ */
+ public int getPriority()
+ {
+ return type.priority;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index d2f3c4d..12650a2 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -24,6 +24,11 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.*;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -32,17 +37,10 @@ import org.apache.cassandra.db.Table;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableLoader;
-import org.apache.cassandra.streaming.PendingFile;
+import org.apache.cassandra.streaming.*;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.OutputHandler;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-
-import com.google.common.util.concurrent.Uninterruptibles;
public class BulkLoader
{
@@ -61,52 +59,20 @@ public class BulkLoader
public static void main(String args[])
{
LoaderOptions options = LoaderOptions.parseArgs(args);
+ OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
+ SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts, options.rpcPort, options.user, options.passwd), handler);
+ DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
+ StreamResultFuture future = loader.stream(options.ignores);
+ future.addEventListener(new ProgressIndicator(handler));
try
{
- OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
- SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(handler, options.hosts, options.rpcPort, options.user, options.passwd), handler);
- DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
- SSTableLoader.LoaderFuture future = loader.stream(options.ignores);
-
- if (options.noProgress)
- {
- future.get();
- }
- else
- {
- ProgressIndicator indicator = new ProgressIndicator(future.getPendingFiles());
- indicator.start();
- System.out.println("");
- boolean printEnd = false;
- while (!future.isDone())
- {
- if (indicator.printProgress())
- {
- // We're done with streaming
- System.out.println("\nWaiting for targets to rebuild indexes ...");
- printEnd = true;
- future.get();
- assert future.isDone();
- }
- else
- {
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- }
- }
- if (!printEnd)
- indicator.printProgress();
- if (future.hadFailures())
- {
- System.err.println("Streaming to the following hosts failed:");
- System.err.println(future.getFailedHosts());
- System.exit(1);
- }
- }
-
+ future.get();
System.exit(0); // We need that to stop non daemonized threads
}
catch (Exception e)
{
+ System.err.println("Streaming to the following hosts failed:");
+ System.err.println(loader.getFailedHosts());
System.err.println(e.getMessage());
if (options.debug)
e.printStackTrace(System.err);
@@ -115,62 +81,63 @@ public class BulkLoader
}
// Return true when everything is at 100%
- static class ProgressIndicator
+ static class ProgressIndicator implements StreamEventHandler
{
- private final Map<InetAddress, Collection<PendingFile>> filesByHost;
+ private final Set<ProgressInfo> progresses = new HashSet<>();
+ private final OutputHandler handler;
+
private long start;
private long lastProgress;
private long lastTime;
- public ProgressIndicator(Map<InetAddress, Collection<PendingFile>> filesByHost)
- {
- this.filesByHost = new HashMap<InetAddress, Collection<PendingFile>>(filesByHost);
- }
-
- public void start()
+ public ProgressIndicator(OutputHandler handler)
{
+ this.handler = handler;
start = lastTime = System.nanoTime();
}
- public boolean printProgress()
+ public void onSuccess(StreamState finalState) {}
+ public void onFailure(Throwable t) {}
+
+ public void handleStreamEvent(StreamEvent event)
{
- boolean done = true;
- StringBuilder sb = new StringBuilder();
- sb.append("\rprogress: ");
- long totalProgress = 0;
- long totalSize = 0;
- for (Map.Entry<InetAddress, Collection<PendingFile>> entry : filesByHost.entrySet())
+ if (event.eventType == StreamEvent.Type.FILE_PROGRESS)
{
- long progress = 0;
- long size = 0;
- int completed = 0;
- Collection<PendingFile> pendings = entry.getValue();
- for (PendingFile f : pendings)
+ ProgressInfo progressInfo = ((StreamEvent.ProgressEvent) event).progress;
+
+ // update progress
+ if (progresses.contains(progressInfo))
+ progresses.remove(progressInfo);
+ progresses.add(progressInfo);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("\rprogress: ");
+
+ long totalProgress = 0;
+ long totalSize = 0;
+ long completed = 0;
+ for (ProgressInfo entry : progresses)
{
- progress += f.progress;
- size += f.size;
- if (f.progress == f.size)
+ if (entry.currentBytes == entry.totalBytes)
completed++;
+ totalProgress += entry.currentBytes;
+ totalSize += entry.totalBytes;
+ sb.append("[").append(entry.peer);
+ sb.append(" ").append(completed).append("/").append(progresses.size());
+ sb.append(" (").append(entry.totalBytes == 0 ? 100L : entry.currentBytes * 100L / entry.totalBytes).append(")] ");
}
- totalProgress += progress;
- totalSize += size;
- if (completed != pendings.size())
- done = false;
- sb.append("[").append(entry.getKey());
- sb.append(" ").append(completed).append("/").append(pendings.size());
- sb.append(" (").append(size == 0 ? 100L : progress * 100L / size).append(")] ");
+ long time = System.nanoTime();
+ long deltaTime = TimeUnit.NANOSECONDS.toMillis(time - lastTime);
+ lastTime = time;
+ long deltaProgress = totalProgress - lastProgress;
+ lastProgress = totalProgress;
+
+ sb.append("[total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append(" - ");
+ sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s");
+ sb.append(" (avg: ").append(mbPerSec(totalProgress, TimeUnit.NANOSECONDS.toMillis(time - start))).append("MB/s)]");
+
+ handler.output(sb.toString());
}
- long time = System.nanoTime();
- long deltaTime = TimeUnit.NANOSECONDS.toMillis(time - lastTime);
- lastTime = time;
- long deltaProgress = totalProgress - lastProgress;
- lastProgress = totalProgress;
-
- sb.append("[total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append(" - ");
- sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s");
- sb.append(" (avg: ").append(mbPerSec(totalProgress, TimeUnit.NANOSECONDS.toMillis(time - start))).append("MB/s)]");
- System.out.print(sb.toString());
- return done;
}
private int mbPerSec(long bytes, long timeInMs)
@@ -182,13 +149,13 @@ public class BulkLoader
static class ExternalClient extends SSTableLoader.Client
{
- private final Set<String> knownCfs = new HashSet<String>();
+ private final Set<String> knownCfs = new HashSet<>();
private final Set<InetAddress> hosts;
private final int rpcPort;
private final String user;
private final String passwd;
- public ExternalClient(OutputHandler outputHandler, Set<InetAddress> hosts, int port, String user, String passwd)
+ public ExternalClient(Set<InetAddress> hosts, int port, String user, String passwd)
{
super();
this.hosts = hosts;
@@ -213,7 +180,7 @@ public class BulkLoader
for (TokenRange tr : client.describe_ring(keyspace))
{
- Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
+ Range<Token> range = new Range<>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
for (String ep : tr.endpoints)
{
addRangeForEndpoint(range, InetAddress.getByName(ep));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index b0b9e23..48ae998 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -45,6 +45,9 @@ import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
import org.apache.cassandra.net.MessagingServiceMBean;
import org.apache.cassandra.service.CacheServiceMBean;
import org.apache.cassandra.service.StorageProxyMBean;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.Pair;
@@ -614,56 +617,34 @@ public class NodeCmd
public void printNetworkStats(final InetAddress addr, PrintStream outs)
{
outs.printf("Mode: %s%n", probe.getOperationMode());
- Set<InetAddress> hosts = addr == null ? probe.getStreamDestinations() : new HashSet<InetAddress>(){{add(addr);}};
- if (hosts.size() == 0)
+ Set<StreamState> statuses = probe.getStreamStatus();
+ if (statuses.isEmpty())
outs.println("Not sending any streams.");
- for (InetAddress host : hosts)
+ for (StreamState status : statuses)
{
- try
+ outs.printf("%s %s%n", status.description, status.planId.toString());
+ for (SessionInfo info : status.sessions)
{
- List<String> files = probe.getFilesDestinedFor(host);
- if (files.size() > 0)
+ outs.printf(" %s%n", info.peer.toString());
+ if (!info.receivingSummaries.isEmpty())
{
- outs.printf("Streaming to: %s%n", host);
- for (String file : files)
- outs.printf(" %s%n", file);
+ outs.printf(" Receiving %d files, %d bytes total%n", info.getTotalFilesToReceive(), info.getTotalSizeToReceive());
+ for (ProgressInfo progress : info.getReceivingFiles())
+ {
+ outs.printf(" %s%n", progress.toString());
+ }
}
- else
+ if (!info.sendingSummaries.isEmpty())
{
- outs.printf(" Nothing streaming to %s%n", host);
+ outs.printf(" Sending %d files, %d bytes total%n", info.getTotalFilesToSend(), info.getTotalSizeToSend());
+ for (ProgressInfo progress : info.getSendingFiles())
+ {
+ outs.printf(" %s%n", progress.toString());
+ }
}
}
- catch (IOException ex)
- {
- outs.printf(" Error retrieving file data for %s%n", host);
- }
}
- hosts = addr == null ? probe.getStreamSources() : new HashSet<InetAddress>(){{add(addr); }};
- if (hosts.size() == 0)
- outs.println("Not receiving any streams.");
- for (InetAddress host : hosts)
- {
- try
- {
- List<String> files = probe.getIncomingFiles(host);
- if (files.size() > 0)
- {
- outs.printf("Streaming from: %s%n", host);
- for (String file : files)
- outs.printf(" %s%n", file);
- }
- else
- {
- outs.printf(" Nothing streaming from %s%n", host);
- }
- }
- catch (IOException ex)
- {
- outs.printf(" Error retrieving file data for %s%n", host);
- }
- }
-
outs.printf("Read Repair Statistics:%nAttempted: %d%nMismatch (Blocking): %d%nMismatch (Background): %d%n", probe.getReadRepairAttempted(), probe.getReadRepairRepairedBlocking(), probe.getReadRepairRepairedBackground());
MessagingServiceMBean ms = probe.msProxy;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index af94d14..452e865 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -50,8 +50,8 @@ import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MessagingServiceMBean;
import org.apache.cassandra.service.*;
-import org.apache.cassandra.streaming.StreamingService;
-import org.apache.cassandra.streaming.StreamingServiceMBean;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.streaming.StreamManagerMBean;
import org.apache.cassandra.utils.SimpleCondition;
/**
@@ -73,7 +73,7 @@ public class NodeProbe
private StorageServiceMBean ssProxy;
private MemoryMXBean memProxy;
private RuntimeMXBean runtimeProxy;
- private StreamingServiceMBean streamProxy;
+ private StreamManagerMBean streamProxy;
public MessagingServiceMBean msProxy;
private FailureDetectorMBean fdProxy;
private CacheServiceMBean cacheService;
@@ -150,8 +150,8 @@ public class NodeProbe
ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class);
name = new ObjectName(MessagingService.MBEAN_NAME);
msProxy = JMX.newMBeanProxy(mbeanServerConn, name, MessagingServiceMBean.class);
- name = new ObjectName(StreamingService.MBEAN_OBJECT_NAME);
- streamProxy = JMX.newMBeanProxy(mbeanServerConn, name, StreamingServiceMBean.class);
+ name = new ObjectName(StreamManagerMBean.OBJECT_NAME);
+ streamProxy = JMX.newMBeanProxy(mbeanServerConn, name, StreamManagerMBean.class);
name = new ObjectName(CompactionManager.MBEAN_OBJECT_NAME);
compactionProxy = JMX.newMBeanProxy(mbeanServerConn, name, CompactionManagerMBean.class);
name = new ObjectName(FailureDetector.MBEAN_NAME);
@@ -545,24 +545,9 @@ public class NodeProbe
return cfsProxy.getSSTablesForKey(key);
}
- public Set<InetAddress> getStreamDestinations()
+ public Set<StreamState> getStreamStatus()
{
- return streamProxy.getStreamDestinations();
- }
-
- public List<String> getFilesDestinedFor(InetAddress host) throws IOException
- {
- return streamProxy.getOutgoingFiles(host.getHostAddress());
- }
-
- public Set<InetAddress> getStreamSources()
- {
- return streamProxy.getStreamSources();
- }
-
- public List<String> getIncomingFiles(InetAddress host) throws IOException
- {
- return streamProxy.getIncomingFiles(host.getHostAddress());
+ return streamProxy.getCurrentStreams();
}
public String getOperationMode()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 7bbe72d..17fa3fd 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.OperationType;
import static org.junit.Assert.*;
@@ -66,7 +65,7 @@ public class BootStrapperTest extends SchemaLoader
TokenMetadata tmd = ss.getTokenMetadata();
assertEquals(numOldNodes, tmd.sortedTokens().size());
- RangeStreamer s = new RangeStreamer(tmd, myEndpoint, OperationType.BOOTSTRAP);
+ RangeStreamer s = new RangeStreamer(tmd, myEndpoint, "Bootstrap");
IFailureDetector mockFailureDetector = new IFailureDetector()
{
public boolean isAlive(InetAddress ep)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 4f3217c..1b3eb48 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -27,29 +27,23 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.*;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
-import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.sink.IMessageSink;
import org.apache.cassandra.net.sink.SinkManager;
-import org.apache.cassandra.streaming.StreamUtil;
import org.apache.cassandra.utils.FBUtilities;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class RemoveTest
{
@@ -119,9 +113,6 @@ public class RemoveTest
@Test
public void testRemoveHostId() throws InterruptedException
{
- ReplicationSink rSink = new ReplicationSink();
- SinkManager.add(rSink);
-
// start removal in background and send replication confirmations
final AtomicBoolean success = new AtomicBoolean(false);
Thread remover = new Thread()
@@ -159,25 +150,4 @@ public class RemoveTest
assertTrue(success.get());
assertTrue(tmd.getLeavingEndpoints().isEmpty());
}
-
- /**
- * sink that captures STREAM_REQUEST messages and calls finishStreamRequest on it
- */
- class ReplicationSink implements IMessageSink
- {
- public MessageIn handleMessage(MessageIn msg, int id, InetAddress to)
- {
- if (!msg.verb.equals(MessagingService.Verb.STREAM_REQUEST))
- return msg;
-
- StreamUtil.finishStreamRequest(msg, to);
-
- return null;
- }
-
- public MessageOut handleMessage(MessageOut msg, int id, InetAddress to)
- {
- return msg;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/51511697/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/BootstrapTest.java b/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
deleted file mode 100644
index 6578dde..0000000
--- a/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.streaming;
-
-import static junit.framework.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.utils.Pair;
-
-import java.util.Arrays;
-
-import org.junit.Test;
-
-public class BootstrapTest extends SchemaLoader
-{
- @Test
- public void testGetNewNames() throws IOException
- {
- String ver = Descriptor.Version.current_version;
- Descriptor desc = Descriptor.fromFilename(new File("Keyspace1", "Keyspace1-Standard1-" + ver +"-500-Data.db").toString());
- // assert !desc.isLatestVersion; // minimum compatible version -- for now it is the latest as well
- PendingFile inContext = new PendingFile(null, desc, "Data.db", Arrays.asList(Pair.create(0L, 1L)), OperationType.BOOTSTRAP);
-
- PendingFile outContext = StreamIn.getContextMapping(inContext);
- // filename and generation are expected to have changed
- assert !inContext.getFilename().equals(outContext.getFilename());
-
- // nothing else should
- assertEquals(inContext.component, outContext.component);
- assertEquals(desc.ksname, outContext.desc.ksname);
- assertEquals(desc.cfname, outContext.desc.cfname);
- assertEquals(desc.version, outContext.desc.version);
- }
-}