You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2018/03/15 23:42:50 UTC
[2/5] cassandra git commit: Abstract streaming for pluggable storage
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
deleted file mode 100644
index 13a3358..0000000
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.messages;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.CompactEndpointSerializationHelper;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.compress.CompressionInfo;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.UUIDSerializer;
-
-/**
- * StreamingFileHeader is appended before sending actual data to describe what it's sending.
- */
-public class FileMessageHeader
-{
- public static FileMessageHeaderSerializer serializer = new FileMessageHeaderSerializer();
-
- public final TableId tableId;
- public UUID planId;
- public int sessionIndex;
- public final int sequenceNumber;
- /** SSTable version */
- public final Version version;
-
- /** SSTable format **/
- public final SSTableFormat.Type format;
- public final long estimatedKeys;
- public final List<Pair<Long, Long>> sections;
- /**
- * Compression info for SSTable to send. Can be null if SSTable is not compressed.
- * On sender, this field is always null to avoid holding large number of Chunks.
- * Use compressionMetadata instead.
- */
- public final CompressionInfo compressionInfo;
- private final CompressionMetadata compressionMetadata;
- public final long repairedAt;
- public final UUID pendingRepair;
- public final int sstableLevel;
- public final SerializationHeader.Component header;
- public final InetAddressAndPort sender;
-
- /* cached size value */
- private transient final long size;
-
- private FileMessageHeader(TableId tableId,
- InetAddressAndPort sender,
- UUID planId,
- int sessionIndex,
- int sequenceNumber,
- Version version,
- SSTableFormat.Type format,
- long estimatedKeys,
- List<Pair<Long, Long>> sections,
- CompressionInfo compressionInfo,
- long repairedAt,
- UUID pendingRepair,
- int sstableLevel,
- SerializationHeader.Component header)
- {
- this.tableId = tableId;
- this.sender = sender;
- this.planId = planId;
- this.sessionIndex = sessionIndex;
- this.sequenceNumber = sequenceNumber;
- this.version = version;
- this.format = format;
- this.estimatedKeys = estimatedKeys;
- this.sections = sections;
- this.compressionInfo = compressionInfo;
- this.compressionMetadata = null;
- this.repairedAt = repairedAt;
- this.pendingRepair = pendingRepair;
- this.sstableLevel = sstableLevel;
- this.header = header;
- this.size = calculateSize();
- }
-
- public FileMessageHeader(TableId tableId,
- InetAddressAndPort sender,
- UUID planId,
- int sessionIndex,
- int sequenceNumber,
- Version version,
- SSTableFormat.Type format,
- long estimatedKeys,
- List<Pair<Long, Long>> sections,
- CompressionMetadata compressionMetadata,
- long repairedAt,
- UUID pendingRepair,
- int sstableLevel,
- SerializationHeader.Component header)
- {
- this.tableId = tableId;
- this.sender = sender;
- this.planId = planId;
- this.sessionIndex = sessionIndex;
- this.sequenceNumber = sequenceNumber;
- this.version = version;
- this.format = format;
- this.estimatedKeys = estimatedKeys;
- this.sections = sections;
- this.compressionInfo = null;
- this.compressionMetadata = compressionMetadata;
- this.repairedAt = repairedAt;
- this.pendingRepair = pendingRepair;
- this.sstableLevel = sstableLevel;
- this.header = header;
- this.size = calculateSize();
- }
-
- public boolean isCompressed()
- {
- return compressionInfo != null || compressionMetadata != null;
- }
-
- /**
- * @return total file size to transfer in bytes
- */
- public long size()
- {
- return size;
- }
-
- private long calculateSize()
- {
- long transferSize = 0;
- if (compressionInfo != null)
- {
- // calculate total length of transferring chunks
- for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
- transferSize += chunk.length + 4; // 4 bytes for CRC
- }
- else if (compressionMetadata != null)
- {
- transferSize = compressionMetadata.getTotalSizeForSections(sections);
- }
- else
- {
- for (Pair<Long, Long> section : sections)
- transferSize += section.right - section.left;
- }
- return transferSize;
- }
-
- @Override
- public String toString()
- {
- final StringBuilder sb = new StringBuilder("Header (");
- sb.append("tableId: ").append(tableId);
- sb.append(", #").append(sequenceNumber);
- sb.append(", version: ").append(version);
- sb.append(", format: ").append(format);
- sb.append(", estimated keys: ").append(estimatedKeys);
- sb.append(", transfer size: ").append(size());
- sb.append(", compressed?: ").append(isCompressed());
- sb.append(", repairedAt: ").append(repairedAt);
- sb.append(", pendingRepair: ").append(pendingRepair);
- sb.append(", level: ").append(sstableLevel);
- sb.append(')');
- return sb.toString();
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- FileMessageHeader that = (FileMessageHeader) o;
- return sequenceNumber == that.sequenceNumber && tableId.equals(that.tableId);
- }
-
- @Override
- public int hashCode()
- {
- int result = tableId.hashCode();
- result = 31 * result + sequenceNumber;
- return result;
- }
-
- public void addSessionInfo(StreamSession session)
- {
- planId = session.planId();
- sessionIndex = session.sessionIndex();
- }
-
- static class FileMessageHeaderSerializer
- {
- public CompressionInfo serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException
- {
- header.tableId.serialize(out);
- CompactEndpointSerializationHelper.streamingInstance.serialize(header.sender, out, version);
- UUIDSerializer.serializer.serialize(header.planId, out, version);
- out.writeInt(header.sessionIndex);
- out.writeInt(header.sequenceNumber);
- out.writeUTF(header.version.toString());
- out.writeUTF(header.format.name);
-
- out.writeLong(header.estimatedKeys);
- out.writeInt(header.sections.size());
- for (Pair<Long, Long> section : header.sections)
- {
- out.writeLong(section.left);
- out.writeLong(section.right);
- }
- // construct CompressionInfo here to avoid holding large number of Chunks on heap.
- CompressionInfo compressionInfo = null;
- if (header.compressionMetadata != null)
- compressionInfo = new CompressionInfo(header.compressionMetadata.getChunksForSections(header.sections), header.compressionMetadata.parameters);
- CompressionInfo.serializer.serialize(compressionInfo, out, version);
- out.writeLong(header.repairedAt);
- out.writeBoolean(header.pendingRepair != null);
- if (header.pendingRepair != null)
- {
- UUIDSerializer.serializer.serialize(header.pendingRepair, out, version);
- }
- out.writeInt(header.sstableLevel);
-
- SerializationHeader.serializer.serialize(header.version, header.header, out);
- return compressionInfo;
- }
-
- public FileMessageHeader deserialize(DataInputPlus in, int version) throws IOException
- {
- TableId tableId = TableId.deserialize(in);
- InetAddressAndPort sender = CompactEndpointSerializationHelper.streamingInstance.deserialize(in, version);
- UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
- int sessionIndex = in.readInt();
- int sequenceNumber = in.readInt();
- Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF());
- SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF());
-
- long estimatedKeys = in.readLong();
- int count = in.readInt();
- List<Pair<Long, Long>> sections = new ArrayList<>(count);
- for (int k = 0; k < count; k++)
- sections.add(Pair.create(in.readLong(), in.readLong()));
- CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, version);
- long repairedAt = in.readLong();
- UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null;
- int sstableLevel = in.readInt();
- SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in);
-
- return new FileMessageHeader(tableId, sender, planId, sessionIndex, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, pendingRepair, sstableLevel, header);
- }
-
- public long serializedSize(FileMessageHeader header, int version)
- {
- long size = header.tableId.serializedSize();
- size += CompactEndpointSerializationHelper.streamingInstance.serializedSize(header.sender, version);
- size += UUIDSerializer.serializer.serializedSize(header.planId, version);
- size += TypeSizes.sizeof(header.sessionIndex);
- size += TypeSizes.sizeof(header.sequenceNumber);
- size += TypeSizes.sizeof(header.version.toString());
- size += TypeSizes.sizeof(header.format.name);
- size += TypeSizes.sizeof(header.estimatedKeys);
-
- size += TypeSizes.sizeof(header.sections.size());
- for (Pair<Long, Long> section : header.sections)
- {
- size += TypeSizes.sizeof(section.left);
- size += TypeSizes.sizeof(section.right);
- }
- size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
- size += TypeSizes.sizeof(header.repairedAt);
- size += TypeSizes.sizeof(header.pendingRepair != null);
- size += header.pendingRepair != null ? UUIDSerializer.serializer.serializedSize(header.pendingRepair, version) : 0;
- size += TypeSizes.sizeof(header.sstableLevel);
-
- size += SerializationHeader.serializer.serializedSize(header.version, header.header);
-
- return size;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
deleted file mode 100644
index 9f43982..0000000
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.messages;
-
-import java.io.IOException;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.util.DataInputPlus;
-
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.streaming.StreamManager;
-import org.apache.cassandra.streaming.StreamReader;
-import org.apache.cassandra.streaming.StreamReceiveException;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.compress.CompressedStreamReader;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-
-
-/**
- * IncomingFileMessage is used to receive the part(or whole) of a SSTable data file.
- */
-public class IncomingFileMessage extends StreamMessage
-{
- public static Serializer<IncomingFileMessage> serializer = new Serializer<IncomingFileMessage>()
- {
- @SuppressWarnings("resource")
- public IncomingFileMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException
- {
- FileMessageHeader header = FileMessageHeader.serializer.deserialize(input, version);
- session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex);
- if (session == null)
- throw new IllegalStateException(String.format("unknown stream session: %s - %d", header.planId, header.sessionIndex));
- ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(header.tableId);
- if (cfs == null)
- throw new StreamReceiveException(session, "CF " + header.tableId + " was dropped during streaming");
-
- StreamReader reader = !header.isCompressed() ? new StreamReader(header, session)
- : new CompressedStreamReader(header, session);
-
- try
- {
- return new IncomingFileMessage(reader.read(input), header);
- }
- catch (Throwable t)
- {
- JVMStabilityInspector.inspectThrowable(t);
- throw new StreamReceiveException(session, t);
- }
- }
-
- public void serialize(IncomingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session)
- {
- throw new UnsupportedOperationException("Not allowed to call serialize on an incoming file");
- }
-
- public long serializedSize(IncomingFileMessage message, int version)
- {
- throw new UnsupportedOperationException("Not allowed to call serializedSize on an incoming file");
- }
- };
-
- public FileMessageHeader header;
- public SSTableMultiWriter sstable;
-
- public IncomingFileMessage(SSTableMultiWriter sstable, FileMessageHeader header)
- {
- super(Type.FILE);
- this.header = header;
- this.sstable = sstable;
- }
-
- @Override
- public String toString()
- {
- String filename = sstable != null ? sstable.getFilename() : null;
- return "File (" + header + ", file: " + filename + ")";
- }
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
new file mode 100644
index 0000000..e17c3ab
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.util.DataInputPlus;
+
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.streaming.IncomingStream;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamReceiveException;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+
+public class IncomingStreamMessage extends StreamMessage
+{
+ public static Serializer<IncomingStreamMessage> serializer = new Serializer<IncomingStreamMessage>()
+ {
+ @SuppressWarnings("resource")
+ public IncomingStreamMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException
+ {
+ StreamMessageHeader header = StreamMessageHeader.serializer.deserialize(input, version);
+ session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex);
+ if (session == null)
+ throw new IllegalStateException(String.format("unknown stream session: %s - %d", header.planId, header.sessionIndex));
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(header.tableId);
+ if (cfs == null)
+ throw new StreamReceiveException(session, "CF " + header.tableId + " was dropped during streaming");
+
+ IncomingStream incomingData = cfs.getStreamManager().prepareIncomingStream(session, header);
+ incomingData.read(input, version);
+
+ try
+ {
+ return new IncomingStreamMessage(incomingData, header);
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ throw new StreamReceiveException(session, t);
+ }
+ }
+
+ public void serialize(IncomingStreamMessage message, DataOutputStreamPlus out, int version, StreamSession session)
+ {
+ throw new UnsupportedOperationException("Not allowed to call serialize on an incoming stream");
+ }
+
+ public long serializedSize(IncomingStreamMessage message, int version)
+ {
+ throw new UnsupportedOperationException("Not allowed to call serializedSize on an incoming stream");
+ }
+ };
+
+ public StreamMessageHeader header;
+ public IncomingStream stream;
+
+ public IncomingStreamMessage(IncomingStream stream, StreamMessageHeader header)
+ {
+ super(Type.STREAM);
+ this.stream = stream;
+ this.header = header;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "IncomingStreamMessage{" +
+ "header=" + header +
+ ", stream=" + stream +
+ '}';
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ IncomingStreamMessage that = (IncomingStreamMessage) o;
+ return Objects.equals(header, that.header) &&
+ Objects.equals(stream, that.stream);
+ }
+
+ public int hashCode()
+ {
+
+ return Objects.hash(header, stream);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
deleted file mode 100644
index 8bbcc05..0000000
--- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.messages;
-
-import java.io.IOException;
-import java.util.List;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.StreamWriter;
-import org.apache.cassandra.streaming.compress.CompressedStreamWriter;
-import org.apache.cassandra.streaming.compress.CompressionInfo;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.concurrent.Ref;
-
-/**
- * OutgoingFileMessage is used to transfer the part(or whole) of a SSTable data file.
- */
-public class OutgoingFileMessage extends StreamMessage
-{
- public static Serializer<OutgoingFileMessage> serializer = new Serializer<OutgoingFileMessage>()
- {
- public OutgoingFileMessage deserialize(DataInputPlus in, int version, StreamSession session)
- {
- throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing file");
- }
-
- public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
- {
- message.startTransfer();
- try
- {
- message.serialize(out, version, session);
- session.fileSent(message.header);
- }
- finally
- {
- message.finishTransfer();
- }
- }
-
- public long serializedSize(OutgoingFileMessage message, int version)
- {
- return 0;
- }
- };
-
- public final FileMessageHeader header;
- private final Ref<SSTableReader> ref;
- private final String filename;
- private boolean completed = false;
- private boolean transferring = false;
-
- public OutgoingFileMessage(Ref<SSTableReader> ref, StreamSession session, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, boolean keepSSTableLevel)
- {
- super(Type.FILE);
- this.ref = ref;
-
- SSTableReader sstable = ref.get();
- filename = sstable.getFilename();
- this.header = new FileMessageHeader(sstable.metadata().id,
- FBUtilities.getBroadcastAddressAndPort(),
- session.planId(),
- session.sessionIndex(),
- sequenceNumber,
- sstable.descriptor.version,
- sstable.descriptor.formatType,
- estimatedKeys,
- sections,
- sstable.compression ? sstable.getCompressionMetadata() : null,
- sstable.getRepairedAt(),
- sstable.getPendingRepair(),
- keepSSTableLevel ? sstable.getSSTableLevel() : 0,
- sstable.header.toComponent());
- }
-
- public synchronized void serialize(DataOutputStreamPlus out, int version, StreamSession session) throws IOException
- {
- if (completed)
- {
- return;
- }
-
- CompressionInfo compressionInfo = FileMessageHeader.serializer.serialize(header, out, version);
- out.flush();
- final SSTableReader reader = ref.get();
- StreamWriter writer = compressionInfo == null ?
- new StreamWriter(reader, header.sections, session) :
- new CompressedStreamWriter(reader, header.sections,
- compressionInfo, session);
- writer.write(out);
- }
-
- @VisibleForTesting
- public synchronized void finishTransfer()
- {
- transferring = false;
- //session was aborted mid-transfer, now it's safe to release
- if (completed)
- {
- ref.release();
- }
- }
-
- @VisibleForTesting
- public synchronized void startTransfer()
- {
- if (completed)
- throw new RuntimeException(String.format("Transfer of file %s already completed or aborted (perhaps session failed?).",
- filename));
- transferring = true;
- }
-
- public synchronized void complete()
- {
- if (!completed)
- {
- completed = true;
- //release only if not transferring
- if (!transferring)
- {
- ref.release();
- }
- }
- }
-
- @Override
- public String toString()
- {
- return "File (" + header + ", file: " + filename + ")";
- }
-
- public String getFilename()
- {
- return filename;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
new file mode 100644
index 0000000..263aabd
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.OutgoingStream;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class OutgoingStreamMessage extends StreamMessage
+{
+ public static Serializer<OutgoingStreamMessage> serializer = new Serializer<OutgoingStreamMessage>()
+ {
+ public OutgoingStreamMessage deserialize(DataInputPlus in, int version, StreamSession session)
+ {
+ throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing stream");
+ }
+
+ public void serialize(OutgoingStreamMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException
+ {
+ message.startTransfer();
+ try
+ {
+ message.serialize(out, version, session);
+ session.streamSent(message);
+ }
+ finally
+ {
+ message.finishTransfer();
+ }
+ }
+
+ public long serializedSize(OutgoingStreamMessage message, int version)
+ {
+ return 0;
+ }
+ };
+
+ public final StreamMessageHeader header;
+ private final TableId tableId;
+ public final OutgoingStream stream;
+ private boolean completed = false;
+ private boolean transferring = false;
+
+ public OutgoingStreamMessage(TableId tableId, StreamSession session, OutgoingStream stream, int sequenceNumber)
+ {
+ super(Type.STREAM);
+ this.tableId = tableId;
+
+ this.stream = stream;
+ this.header = new StreamMessageHeader(tableId,
+ FBUtilities.getBroadcastAddressAndPort(),
+ session.planId(),
+ session.sessionIndex(),
+ sequenceNumber,
+ stream.getRepairedAt(),
+ stream.getPendingRepair());
+ }
+
+ public synchronized void serialize(DataOutputStreamPlus out, int version, StreamSession session) throws IOException
+ {
+ if (completed)
+ {
+ return;
+ }
+ StreamMessageHeader.serializer.serialize(header, out, version);
+ stream.write(session, out, version);
+ }
+
+ @VisibleForTesting
+ public synchronized void finishTransfer()
+ {
+ transferring = false;
+ //session was aborted mid-transfer, now it's safe to release
+ if (completed)
+ {
+ stream.finish();
+ }
+ }
+
+ @VisibleForTesting
+ public synchronized void startTransfer()
+ {
+ if (completed)
+ throw new RuntimeException(String.format("Transfer of stream %s already completed or aborted (perhaps session failed?).",
+ stream));
+ transferring = true;
+ }
+
+ public synchronized void complete()
+ {
+ if (!completed)
+ {
+ completed = true;
+ //release only if not transferring
+ if (!transferring)
+ {
+ stream.finish();
+ }
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "OutgoingStreamMessage{" +
+ "header=" + header +
+ ", stream=" + stream +
+ '}';
+ }
+
+ public String getName()
+ {
+ return stream.getName();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index fced133..fbd3e21 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -44,18 +44,16 @@ public class StreamInitMessage extends StreamMessage
public final UUID planId;
public final StreamOperation streamOperation;
- public final boolean keepSSTableLevel;
public final UUID pendingRepair;
public final PreviewKind previewKind;
- public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
+ public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID planId, StreamOperation streamOperation, UUID pendingRepair, PreviewKind previewKind)
{
super(Type.STREAM_INIT);
this.from = from;
this.sessionIndex = sessionIndex;
this.planId = planId;
this.streamOperation = streamOperation;
- this.keepSSTableLevel = keepSSTableLevel;
this.pendingRepair = pendingRepair;
this.previewKind = previewKind;
}
@@ -77,7 +75,6 @@ public class StreamInitMessage extends StreamMessage
out.writeInt(message.sessionIndex);
UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version);
out.writeUTF(message.streamOperation.getDescription());
- out.writeBoolean(message.keepSSTableLevel);
out.writeBoolean(message.pendingRepair != null);
if (message.pendingRepair != null)
@@ -93,11 +90,10 @@ public class StreamInitMessage extends StreamMessage
int sessionIndex = in.readInt();
UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
String description = in.readUTF();
- boolean keepSSTableLevel = in.readBoolean();
UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null;
PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
- return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), keepSSTableLevel, pendingRepair, previewKind);
+ return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), pendingRepair, previewKind);
}
public long serializedSize(StreamInitMessage message, int version)
@@ -106,7 +102,6 @@ public class StreamInitMessage extends StreamMessage
size += TypeSizes.sizeof(message.sessionIndex);
size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version);
size += TypeSizes.sizeof(message.streamOperation.getDescription());
- size += TypeSizes.sizeof(message.keepSSTableLevel);
size += TypeSizes.sizeof(message.pendingRepair != null);
if (message.pendingRepair != null)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index feeab05..7ab3d34 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -67,7 +67,7 @@ public abstract class StreamMessage
public enum Type
{
PREPARE_SYN(1, 5, PrepareSynMessage.serializer),
- FILE(2, 0, IncomingFileMessage.serializer, OutgoingFileMessage.serializer),
+ STREAM(2, 0, IncomingStreamMessage.serializer, OutgoingStreamMessage.serializer),
RECEIVED(3, 4, ReceivedMessage.serializer),
COMPLETE(5, 1, CompleteMessage.serializer),
SESSION_FAILED(6, 5, SessionFailedMessage.serializer),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java
new file mode 100644
index 0000000..84cf3a3
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+/**
+ * StreamingFileHeader is appended before sending actual data to describe what it's sending.
+ */
+public class StreamMessageHeader
+{
+ public static FileMessageHeaderSerializer serializer = new FileMessageHeaderSerializer();
+
+ public final TableId tableId;
+ public UUID planId;
+ public int sessionIndex;
+ public final int sequenceNumber;
+ public final long repairedAt;
+ public final UUID pendingRepair;
+ public final InetAddressAndPort sender;
+
+ public StreamMessageHeader(TableId tableId,
+ InetAddressAndPort sender,
+ UUID planId,
+ int sessionIndex,
+ int sequenceNumber,
+ long repairedAt,
+ UUID pendingRepair)
+ {
+ this.tableId = tableId;
+ this.sender = sender;
+ this.planId = planId;
+ this.sessionIndex = sessionIndex;
+ this.sequenceNumber = sequenceNumber;
+ this.repairedAt = repairedAt;
+ this.pendingRepair = pendingRepair;
+ }
+
+ @Override
+ public String toString()
+ {
+ final StringBuilder sb = new StringBuilder("Header (");
+ sb.append("tableId: ").append(tableId);
+ sb.append(", #").append(sequenceNumber);
+ sb.append(", repairedAt: ").append(repairedAt);
+ sb.append(", pendingRepair: ").append(pendingRepair);
+ sb.append(')');
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ StreamMessageHeader that = (StreamMessageHeader) o;
+ return sequenceNumber == that.sequenceNumber && tableId.equals(that.tableId);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = tableId.hashCode();
+ result = 31 * result + sequenceNumber;
+ return result;
+ }
+
+ public void addSessionInfo(StreamSession session)
+ {
+ planId = session.planId();
+ sessionIndex = session.sessionIndex();
+ }
+
+ static class FileMessageHeaderSerializer
+ {
+ public void serialize(StreamMessageHeader header, DataOutputPlus out, int version) throws IOException
+ {
+ header.tableId.serialize(out);
+ CompactEndpointSerializationHelper.streamingInstance.serialize(header.sender, out, version);
+ UUIDSerializer.serializer.serialize(header.planId, out, version);
+ out.writeInt(header.sessionIndex);
+ out.writeInt(header.sequenceNumber);
+ out.writeLong(header.repairedAt);
+ out.writeBoolean(header.pendingRepair != null);
+ if (header.pendingRepair != null)
+ {
+ UUIDSerializer.serializer.serialize(header.pendingRepair, out, version);
+ }
+ }
+
+ public StreamMessageHeader deserialize(DataInputPlus in, int version) throws IOException
+ {
+ TableId tableId = TableId.deserialize(in);
+ InetAddressAndPort sender = CompactEndpointSerializationHelper.streamingInstance.deserialize(in, version);
+ UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version);
+ int sessionIndex = in.readInt();
+ int sequenceNumber = in.readInt();
+ long repairedAt = in.readLong();
+ UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null;
+
+ return new StreamMessageHeader(tableId, sender, planId, sessionIndex, sequenceNumber, repairedAt, pendingRepair);
+ }
+
+ public long serializedSize(StreamMessageHeader header, int version)
+ {
+ long size = header.tableId.serializedSize();
+ size += CompactEndpointSerializationHelper.streamingInstance.serializedSize(header.sender, version);
+ size += UUIDSerializer.serializer.serializedSize(header.planId, version);
+ size += TypeSizes.sizeof(header.sessionIndex);
+ size += TypeSizes.sizeof(header.sequenceNumber);
+ size += TypeSizes.sizeof(header.repairedAt);
+ size += TypeSizes.sizeof(header.pendingRepair != null);
+ size += header.pendingRepair != null ? UUIDSerializer.serializer.serializedSize(header.pendingRepair, version) : 0;
+
+ return size;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
new file mode 100644
index 0000000..289bb0f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.util.ArrayList;
+
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.serializers.SerializationUtils;
+
+public class CassandraStreamHeaderTest
+{
+ @Test
+ public void serializerTest()
+ {
+ String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)";
+ TableMetadata metadata = CreateTableStatement.parse(ddl, "ks").build();
+ CassandraStreamHeader header = new CassandraStreamHeader(BigFormat.latestVersion,
+ SSTableFormat.Type.BIG,
+ 0,
+ new ArrayList<>(),
+ ((CompressionMetadata) null),
+ 0,
+ SerializationHeader.makeWithoutStats(metadata).toComponent());
+
+ SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
new file mode 100644
index 0000000..8497e71
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.OutgoingStream;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamConnectionFactory;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+
+public class CassandraStreamManagerTest
+{
+ private static final String KEYSPACE = null;
+ private String keyspace = null;
+ private static final String table = "tbl";
+ private static final StreamConnectionFactory connectionFactory = new DefaultConnectionFactory();
+
+ private TableMetadata tbm;
+ private ColumnFamilyStore cfs;
+
+ @BeforeClass
+ public static void setupClass() throws Exception
+ {
+ SchemaLoader.prepareServer();
+ }
+
+ @Before
+ public void createKeyspace() throws Exception
+ {
+ keyspace = String.format("ks_%s", System.currentTimeMillis());
+ tbm = CreateTableStatement.parse(String.format("CREATE TABLE %s (k INT PRIMARY KEY, v INT)", table), keyspace).build();
+ SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), tbm);
+ cfs = Schema.instance.getColumnFamilyStoreInstance(tbm.id);
+ }
+
+ private static StreamSession session(UUID pendingRepair)
+ {
+ try
+ {
+ return new StreamSession(StreamOperation.REPAIR,
+ InetAddressAndPort.getByName("127.0.0.1"),
+ InetAddressAndPort.getByName("127.0.0.2"),
+ connectionFactory,
+ 0,
+ pendingRepair,
+ PreviewKind.NONE);
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ private SSTableReader createSSTable(Runnable queryable)
+ {
+ Set<SSTableReader> before = cfs.getLiveSSTables();
+ queryable.run();
+ cfs.forceBlockingFlush();
+ Set<SSTableReader> after = cfs.getLiveSSTables();
+
+ Set<SSTableReader> diff = Sets.difference(after, before);
+ return Iterables.getOnlyElement(diff);
+ }
+
+ private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException
+ {
+ Descriptor descriptor = sstable.descriptor;
+ descriptor.getMetadataSerializer().mutateRepaired(descriptor, repairedAt, pendingRepair);
+ sstable.reloadSSTableMetadata();
+
+ }
+
+ private static Set<SSTableReader> sstablesFromStreams(Collection<OutgoingStream> streams)
+ {
+ Set<SSTableReader> sstables = new HashSet<>();
+ for (OutgoingStream stream: streams)
+ {
+ Ref<SSTableReader> ref = CassandraOutgoingFile.fromStream(stream).getRef();
+ sstables.add(ref.get());
+ ref.release();
+ }
+ return sstables;
+ }
+
+ private Set<SSTableReader> getReadersForRange(Range<Token> range)
+ {
+ Collection<OutgoingStream> streams = cfs.getStreamManager().createOutgoingStreams(session(NO_PENDING_REPAIR),
+ Collections.singleton(range),
+ NO_PENDING_REPAIR,
+ PreviewKind.NONE);
+ return sstablesFromStreams(streams);
+ }
+
+ private Set<SSTableReader> selectReaders(UUID pendingRepair)
+ {
+ IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+ Collection<Range<Token>> ranges = Lists.newArrayList(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
+ Collection<OutgoingStream> streams = cfs.getStreamManager().createOutgoingStreams(session(pendingRepair), ranges, pendingRepair, PreviewKind.NONE);
+ return sstablesFromStreams(streams);
+ }
+
+ @Test
+ public void incrementalSSTableSelection() throws Exception
+ {
+ // make 3 tables, 1 unrepaired, 2 pending repair with different repair ids, and 1 repaired
+ SSTableReader sstable1 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (1, 1)", keyspace, table)));
+ SSTableReader sstable2 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (2, 2)", keyspace, table)));
+ SSTableReader sstable3 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (3, 3)", keyspace, table)));
+ SSTableReader sstable4 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (4, 4)", keyspace, table)));
+
+
+ UUID pendingRepair = UUIDGen.getTimeUUID();
+ long repairedAt = System.currentTimeMillis();
+ mutateRepaired(sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair);
+ mutateRepaired(sstable3, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
+ mutateRepaired(sstable4, repairedAt, NO_PENDING_REPAIR);
+
+
+
+ // no pending repair should return all sstables
+ Assert.assertEquals(Sets.newHashSet(sstable1, sstable2, sstable3, sstable4), selectReaders(NO_PENDING_REPAIR));
+
+ // a pending repair arg should only return sstables with the same pending repair id
+ Assert.assertEquals(Sets.newHashSet(sstable2), selectReaders(pendingRepair));
+ }
+
+ @Test
+ public void testSSTableSectionsForRanges() throws Exception
+ {
+ cfs.truncateBlocking();
+
+ createSSTable(() -> {
+ QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (1, 1)", keyspace, table));
+ QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (2, 2)", keyspace, table));
+ });
+
+ Collection<SSTableReader> allSSTables = cfs.getLiveSSTables();
+ Assert.assertEquals(1, allSSTables.size());
+ final Token firstToken = allSSTables.iterator().next().first.getToken();
+ DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1);
+
+ Set<SSTableReader> sstablesBeforeRewrite = getReadersForRange(new Range<>(firstToken, firstToken));
+ Assert.assertEquals(1, sstablesBeforeRewrite.size());
+ final AtomicInteger checkCount = new AtomicInteger();
+ // needed since we get notified when compaction is done as well - we can't get sections for ranges for obsoleted sstables
+ final AtomicBoolean done = new AtomicBoolean(false);
+ final AtomicBoolean failed = new AtomicBoolean(false);
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ while (!done.get())
+ {
+ Range<Token> range = new Range<Token>(firstToken, firstToken);
+ Set<SSTableReader> sstables = getReadersForRange(range);
+ if (sstables.size() != 1)
+ failed.set(true);
+ checkCount.incrementAndGet();
+ Uninterruptibles.sleepUninterruptibly(5, TimeUnit.MILLISECONDS);
+ }
+ }
+ };
+ Thread t = NamedThreadFactory.createThread(r);
+ try
+ {
+ t.start();
+ cfs.forceMajorCompaction();
+ // reset
+ }
+ finally
+ {
+ DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(50);
+ done.set(true);
+ t.join(20);
+ }
+ Assert.assertFalse(failed.get());
+ Assert.assertTrue(checkCount.get() >= 2);
+ cfs.truncateBlocking();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
index ad5f8f5..8f0a407 100644
--- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
+++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.DefaultConnectionFactory;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.FBUtilities;
@@ -51,7 +52,7 @@ public class StreamStateStoreTest
Range<Token> range = new Range<>(factory.fromString("0"), factory.fromString("100"));
InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
- StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null, PreviewKind.NONE);
+ StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, local, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf"));
StreamStateStore store = new StreamStateStore();
@@ -72,7 +73,7 @@ public class StreamStateStoreTest
// add different range within the same keyspace
Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200"));
- session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null, PreviewKind.NONE);
+ session = new StreamSession(StreamOperation.BOOTSTRAP, local, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf"));
session.state(StreamSession.State.COMPLETE);
store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index e399c67..8dd8197 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Random;
import java.util.UUID;
+import com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -42,6 +43,7 @@ import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.Verifier;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -52,6 +54,7 @@ import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.OutgoingStream;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamOperation;
@@ -248,12 +251,11 @@ public class LegacySSTableTest
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100"))));
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("100")), p.getMinimumToken()));
- ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
- details.add(new StreamSession.SSTableStreamingSections(sstable.ref(),
- sstable.getPositionsForRanges(ranges),
- sstable.estimatedKeysForRanges(ranges)));
- new StreamPlan(StreamOperation.OTHER).transferFiles(FBUtilities.getBroadcastAddressAndPort(), details)
- .execute().get();
+ List<OutgoingStream> streams = Lists.newArrayList(new CassandraOutgoingFile(StreamOperation.OTHER,
+ sstable.ref(),
+ sstable.getPositionsForRanges(ranges),
+ sstable.estimatedKeysForRanges(ranges)));
+ new StreamPlan(StreamOperation.OTHER).transferStreams(FBUtilities.getBroadcastAddressAndPort(), streams).execute().get();
}
private static void truncateLegacyTables(String legacyVersion) throws Exception
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 53f5ab3..5c3e8c9 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -774,65 +774,6 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
validateCFS(cfs);
}
- @Test
- public void testSSTableSectionsForRanges() throws IOException, InterruptedException, ExecutionException
- {
- Keyspace keyspace = Keyspace.open(KEYSPACE);
- final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
- truncate(cfs);
-
- cfs.addSSTable(writeFile(cfs, 1000));
-
- Collection<SSTableReader> allSSTables = cfs.getLiveSSTables();
- assertEquals(1, allSSTables.size());
- final Token firstToken = allSSTables.iterator().next().first.getToken();
- DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1);
-
- List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = StreamSession.getSSTableSectionsForRanges(
- Collections.singleton(new Range<Token>(firstToken, firstToken)),
- Collections.singleton(cfs), null, PreviewKind.NONE);
- assertEquals(1, sectionsBeforeRewrite.size());
- for (StreamSession.SSTableStreamingSections section : sectionsBeforeRewrite)
- section.ref.release();
- final AtomicInteger checkCount = new AtomicInteger();
- // needed since we get notified when compaction is done as well - we can't get sections for ranges for obsoleted sstables
- final AtomicBoolean done = new AtomicBoolean(false);
- final AtomicBoolean failed = new AtomicBoolean(false);
- Runnable r = new Runnable()
- {
- public void run()
- {
- while (!done.get())
- {
- Set<Range<Token>> range = Collections.singleton(new Range<Token>(firstToken, firstToken));
- List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), null, PreviewKind.NONE);
- if (sections.size() != 1)
- failed.set(true);
- for (StreamSession.SSTableStreamingSections section : sections)
- section.ref.release();
- checkCount.incrementAndGet();
- Uninterruptibles.sleepUninterruptibly(5, TimeUnit.MILLISECONDS);
- }
- }
- };
- Thread t = NamedThreadFactory.createThread(r);
- try
- {
- t.start();
- cfs.forceMajorCompaction();
- // reset
- }
- finally
- {
- DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(50);
- done.set(true);
- t.join(20);
- }
- assertFalse(failed.get());
- assertTrue(checkCount.get() >= 2);
- truncate(cfs);
- }
-
/**
* emulates anticompaction - writing from one source sstable to two new sstables
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/serializers/SerializationUtils.java b/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
new file mode 100644
index 0000000..7ce4ec5
--- /dev/null
+++ b/test/unit/org/apache/cassandra/serializers/SerializationUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.serializers;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+
+public class SerializationUtils
+{
+
+ public static <T> T cycleSerialization(T src, IVersionedSerializer<T> serializer, int version)
+ {
+ int expectedSize = (int) serializer.serializedSize(src, version);
+
+ try (DataOutputBuffer out = new DataOutputBuffer(expectedSize))
+ {
+ serializer.serialize(src, out, version);
+ Assert.assertEquals(expectedSize, out.buffer().limit());
+ try (DataInputBuffer in = new DataInputBuffer(out.buffer(), false))
+ {
+ return serializer.deserialize(in, version);
+ }
+ }
+ catch (IOException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ public static <T> T cycleSerialization(T src, IVersionedSerializer<T> serializer)
+ {
+ return cycleSerialization(src, serializer, MessagingService.current_version);
+ }
+
+ public static <T> void assertSerializationCycle(T src, IVersionedSerializer<T> serializer, int version)
+ {
+ T dst = cycleSerialization(src, serializer, version);
+ Assert.assertEquals(src, dst);
+ }
+
+ public static <T> void assertSerializationCycle(T src, IVersionedSerializer<T> serializer)
+ {
+ assertSerializationCycle(src, serializer, MessagingService.current_version);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
deleted file mode 100644
index 6abc2a2..0000000
--- a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.streaming;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.statements.CreateTableStatement;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.UUIDGen;
-
-public class StreamSessionTest
-{
- private String keyspace = null;
- private static final String table = "tbl";
-
- private TableMetadata tbm;
- private ColumnFamilyStore cfs;
-
- @BeforeClass
- public static void setupClass() throws Exception
- {
- SchemaLoader.prepareServer();
- }
-
- @Before
- public void createKeyspace() throws Exception
- {
- keyspace = String.format("ks_%s", System.currentTimeMillis());
- tbm = CreateTableStatement.parse(String.format("CREATE TABLE %s (k INT PRIMARY KEY, v INT)", table), keyspace).build();
- SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), tbm);
- cfs = Schema.instance.getColumnFamilyStoreInstance(tbm.id);
- }
-
- private SSTableReader createSSTable(Runnable queryable)
- {
- Set<SSTableReader> before = cfs.getLiveSSTables();
- queryable.run();
- cfs.forceBlockingFlush();
- Set<SSTableReader> after = cfs.getLiveSSTables();
-
- Set<SSTableReader> diff = Sets.difference(after, before);
- assert diff.size() == 1 : "Expected 1 new sstable, got " + diff.size();
- return diff.iterator().next();
- }
-
- private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException
- {
- Descriptor descriptor = sstable.descriptor;
- descriptor.getMetadataSerializer().mutateRepaired(descriptor, repairedAt, pendingRepair);
- sstable.reloadSSTableMetadata();
-
- }
-
- private Set<SSTableReader> selectReaders(UUID pendingRepair)
- {
- IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
- Collection<Range<Token>> ranges = Lists.newArrayList(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
- List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(ranges,
- Lists.newArrayList(cfs),
- pendingRepair,
- PreviewKind.NONE);
- Set<SSTableReader> sstables = new HashSet<>();
- for (StreamSession.SSTableStreamingSections section: sections)
- {
- sstables.add(section.ref.get());
- }
- return sstables;
- }
-
- @Test
- public void incrementalSSTableSelection() throws Exception
- {
- // make 3 tables, 1 unrepaired, 2 pending repair with different repair ids, and 1 repaired
- SSTableReader sstable1 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (1, 1)", keyspace, table)));
- SSTableReader sstable2 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (2, 2)", keyspace, table)));
- SSTableReader sstable3 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (3, 3)", keyspace, table)));
- SSTableReader sstable4 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (4, 4)", keyspace, table)));
-
-
- UUID pendingRepair = UUIDGen.getTimeUUID();
- long repairedAt = System.currentTimeMillis();
- mutateRepaired(sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair);
- mutateRepaired(sstable3, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
- mutateRepaired(sstable4, repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
-
- // no pending repair should return all sstables
- Assert.assertEquals(Sets.newHashSet(sstable1, sstable2, sstable3, sstable4), selectReaders(ActiveRepairService.NO_PENDING_REPAIR));
-
- // a pending repair arg should only return sstables with the same pending repair id
- Assert.assertEquals(Sets.newHashSet(sstable2), selectReaders(pendingRepair));
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index ceaaae0..45c917a 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -36,6 +36,7 @@ import junit.framework.Assert;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -43,7 +44,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
+import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.Ref;
@@ -75,7 +76,7 @@ public class StreamTransferTaskTest
public void testScheduleTimeout() throws Exception
{
InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
- StreamSession session = new StreamSession(peer, peer, (connectionId, protocolVersion) -> new EmbeddedChannel(), 0, true, UUID.randomUUID(), PreviewKind.ALL);
+ StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, peer, peer, (connectionId, protocolVersion) -> new EmbeddedChannel(), 0, UUID.randomUUID(), PreviewKind.ALL);
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
// create two sstables
@@ -91,7 +92,7 @@ public class StreamTransferTaskTest
{
List<Range<Token>> ranges = new ArrayList<>();
ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
- task.addTransferFile(sstable.selfRef(), 1, sstable.getPositionsForRanges(ranges));
+ task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.selfRef(), sstable.getPositionsForRanges(ranges), 1));
}
assertEquals(2, task.getTotalNumberOfFiles());
@@ -121,9 +122,9 @@ public class StreamTransferTaskTest
public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
{
InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
- StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, new DefaultConnectionFactory(), false, null, PreviewKind.NONE);
+ StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, null, PreviewKind.NONE);
StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), streamCoordinator);
- StreamSession session = new StreamSession(peer, peer, null, 0, true, null, PreviewKind.NONE);
+ StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, peer, peer, null, 0, null, PreviewKind.NONE);
session.init(future);
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
@@ -143,7 +144,7 @@ public class StreamTransferTaskTest
ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
Ref<SSTableReader> ref = sstable.selfRef();
refs.add(ref);
- task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges));
+ task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, ref, sstable.getPositionsForRanges(ranges), 1));
}
assertEquals(2, task.getTotalNumberOfFiles());
@@ -151,10 +152,10 @@ public class StreamTransferTaskTest
session.transfers.put(TableId.generate(), task);
//make a copy of outgoing file messages, since task is cleared when it's aborted
- Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+ Collection<OutgoingStreamMessage> files = new LinkedList<>(task.streams.values());
//simulate start transfer
- for (OutgoingFileMessage file : files)
+ for (OutgoingStreamMessage file : files)
{
file.startTransfer();
}
@@ -169,7 +170,7 @@ public class StreamTransferTaskTest
}
//simulate finish transfer
- for (OutgoingFileMessage file : files)
+ for (OutgoingStreamMessage file : files)
{
file.finishTransfer();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 16c07a0..575200a 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -34,6 +34,7 @@ import junit.framework.Assert;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -254,13 +255,13 @@ public class StreamingTransferTest
private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception
{
- StreamPlan streamPlan = new StreamPlan(StreamOperation.OTHER).transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
+ StreamPlan streamPlan = new StreamPlan(StreamOperation.OTHER).transferStreams(LOCAL, makeOutgoingStreams(ranges, Refs.tryRef(Arrays.asList(sstable))));
streamPlan.execute().get();
//cannot add files after stream session is finished
try
{
- streamPlan.transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
+ streamPlan.transferStreams(LOCAL, makeOutgoingStreams(ranges, Refs.tryRef(Arrays.asList(sstable))));
fail("Should have thrown exception");
}
catch (RuntimeException e)
@@ -269,16 +270,22 @@ public class StreamingTransferTest
}
}
- private Collection<StreamSession.SSTableStreamingSections> makeStreamingDetails(List<Range<Token>> ranges, Refs<SSTableReader> sstables)
+ private Collection<OutgoingStream> makeOutgoingStreams(StreamOperation operation, List<Range<Token>> ranges, Refs<SSTableReader> sstables)
{
- ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>();
+ ArrayList<OutgoingStream> streams = new ArrayList<>();
for (SSTableReader sstable : sstables)
{
- details.add(new StreamSession.SSTableStreamingSections(sstables.get(sstable),
- sstable.getPositionsForRanges(ranges),
- sstable.estimatedKeysForRanges(ranges)));
+ streams.add(new CassandraOutgoingFile(operation,
+ sstables.get(sstable),
+ sstable.getPositionsForRanges(ranges),
+ sstable.estimatedKeysForRanges(ranges)));
}
- return details;
+ return streams;
+ }
+
+ private Collection<OutgoingStream> makeOutgoingStreams(List<Range<Token>> ranges, Refs<SSTableReader> sstables)
+ {
+ return makeOutgoingStreams(StreamOperation.OTHER, ranges, sstables);
}
private void doTransferTable(boolean transferSSTables) throws Exception
@@ -458,7 +465,7 @@ public class StreamingTransferTest
// Acquiring references, transferSSTables needs it
Refs<SSTableReader> refs = Refs.tryRef(Arrays.asList(sstable, sstable2));
assert refs != null;
- new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, refs)).execute().get();
+ new StreamPlan("StreamingTransferTest").transferStreams(LOCAL, makeOutgoingStreams(ranges, refs)).execute().get();
// confirm that the sstables were transferred and registered and that 2 keys arrived
ColumnFamilyStore cfstore = Keyspace.open(keyspaceName).getColumnFamilyStore(cfname);
@@ -513,7 +520,7 @@ public class StreamingTransferTest
if (refs == null)
throw new AssertionError();
- new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, refs)).execute().get();
+ new StreamPlan("StreamingTransferTest").transferStreams(LOCAL, makeOutgoingStreams(ranges, refs)).execute().get();
// check that only two keys were transferred
for (Map.Entry<DecoratedKey,String> entry : Arrays.asList(first, last))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
index 617bae1..fd22a65 100644
--- a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
+++ b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.streaming.async;
-import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -64,8 +63,8 @@ public class NettyStreamingMessageSenderTest
channel = new EmbeddedChannel();
channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
UUID pendingRepair = UUID.randomUUID();
- session = new StreamSession(REMOTE_ADDR, REMOTE_ADDR, (connectionId, protocolVersion) -> null, 0, true, pendingRepair, PreviewKind.ALL);
- StreamResultFuture future = StreamResultFuture.initReceivingSide(0, UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, true, pendingRepair, session.getPreviewKind());
+ session = new StreamSession(StreamOperation.BOOTSTRAP, REMOTE_ADDR, REMOTE_ADDR, (connectionId, protocolVersion) -> null, 0, pendingRepair, PreviewKind.ALL);
+ StreamResultFuture future = StreamResultFuture.initReceivingSide(0, UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, pendingRepair, session.getPreviewKind());
session.init(future);
sender = session.getMessageSender();
sender.setControlMessageChannel(channel);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org