You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2017/04/24 16:24:51 UTC
[2/3] cassandra git commit: Add repair streaming preview
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index 3b13cd8..6c8ff9d 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.utils.FBUtilities;
@@ -47,6 +48,7 @@ public class RepairOption
public static final String TRACE_KEY = "trace";
public static final String SUB_RANGE_REPAIR_KEY = "sub_range_repair";
public static final String PULL_REPAIR_KEY = "pullRepair";
+ public static final String PREVIEW = "previewKind";
// we don't want to push nodes too much for repair
public static final int MAX_JOB_THREADS = 4;
@@ -136,6 +138,7 @@ public class RepairOption
RepairParallelism parallelism = RepairParallelism.fromName(options.get(PARALLELISM_KEY));
boolean primaryRange = Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY));
boolean incremental = Boolean.parseBoolean(options.get(INCREMENTAL_KEY));
+ PreviewKind previewKind = PreviewKind.valueOf(options.getOrDefault(PREVIEW, PreviewKind.NONE.toString()));
boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY));
boolean pullRepair = Boolean.parseBoolean(options.get(PULL_REPAIR_KEY));
@@ -171,7 +174,7 @@ public class RepairOption
}
}
- RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair);
+ RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, previewKind);
// data centers
String dataCentersStr = options.get(DATACENTERS_KEY);
@@ -252,13 +255,14 @@ public class RepairOption
private final int jobThreads;
private final boolean isSubrangeRepair;
private final boolean pullRepair;
+ private final PreviewKind previewKind;
private final Collection<String> columnFamilies = new HashSet<>();
private final Collection<String> dataCenters = new HashSet<>();
private final Collection<String> hosts = new HashSet<>();
private final Collection<Range<Token>> ranges = new HashSet<>();
- public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair)
+ public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, PreviewKind previewKind)
{
if (FBUtilities.isWindows &&
(DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) &&
@@ -277,6 +281,7 @@ public class RepairOption
this.ranges.addAll(ranges);
this.isSubrangeRepair = isSubrangeRepair;
this.pullRepair = pullRepair;
+ this.previewKind = previewKind;
}
public RepairParallelism getParallelism()
@@ -339,6 +344,16 @@ public class RepairOption
return isSubrangeRepair;
}
+ public PreviewKind getPreviewKind()
+ {
+ return previewKind;
+ }
+
+ public boolean isPreview()
+ {
+ return previewKind.isPreview();
+ }
+
public boolean isInLocalDCOnly() {
return dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter());
}
@@ -347,16 +362,17 @@ public class RepairOption
public String toString()
{
return "repair options (" +
- "parallelism: " + parallelism +
- ", primary range: " + primaryRange +
- ", incremental: " + incremental +
- ", job threads: " + jobThreads +
- ", ColumnFamilies: " + columnFamilies +
- ", dataCenters: " + dataCenters +
- ", hosts: " + hosts +
- ", # of ranges: " + ranges.size() +
- ", pull repair: " + pullRepair +
- ')';
+ "parallelism: " + parallelism +
+ ", primary range: " + primaryRange +
+ ", incremental: " + incremental +
+ ", job threads: " + jobThreads +
+ ", ColumnFamilies: " + columnFamilies +
+ ", dataCenters: " + dataCenters +
+ ", hosts: " + hosts +
+ ", previewKind: " + previewKind +
+ ", # of ranges: " + ranges.size() +
+ ", pull repair: " + pullRepair +
+ ')';
}
public Map<String, String> asMap()
@@ -373,6 +389,7 @@ public class RepairOption
options.put(TRACE_KEY, Boolean.toString(trace));
options.put(RANGES_KEY, Joiner.on(",").join(ranges));
options.put(PULL_REPAIR_KEY, Boolean.toString(pullRepair));
+ options.put(PREVIEW, previewKind.toString());
return options;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
index 178e710..7b68daf 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.repair.messages;
import java.io.IOException;
import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Objects;
import org.apache.cassandra.db.TypeSizes;
@@ -26,6 +28,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.repair.NodePair;
import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.streaming.SessionSummary;
/**
*
@@ -40,16 +43,20 @@ public class SyncComplete extends RepairMessage
/** true if sync success, false otherwise */
public final boolean success;
- public SyncComplete(RepairJobDesc desc, NodePair nodes, boolean success)
+ public final List<SessionSummary> summaries;
+
+ public SyncComplete(RepairJobDesc desc, NodePair nodes, boolean success, List<SessionSummary> summaries)
{
super(Type.SYNC_COMPLETE, desc);
this.nodes = nodes;
this.success = success;
+ this.summaries = summaries;
}
- public SyncComplete(RepairJobDesc desc, InetAddress endpoint1, InetAddress endpoint2, boolean success)
+ public SyncComplete(RepairJobDesc desc, InetAddress endpoint1, InetAddress endpoint2, boolean success, List<SessionSummary> summaries)
{
super(Type.SYNC_COMPLETE, desc);
+ this.summaries = summaries;
this.nodes = new NodePair(endpoint1, endpoint2);
this.success = success;
}
@@ -63,13 +70,14 @@ public class SyncComplete extends RepairMessage
return messageType == other.messageType &&
desc.equals(other.desc) &&
success == other.success &&
- nodes.equals(other.nodes);
+ nodes.equals(other.nodes) &&
+ summaries.equals(other.summaries);
}
@Override
public int hashCode()
{
- return Objects.hash(messageType, desc, success, nodes);
+ return Objects.hash(messageType, desc, success, nodes, summaries);
}
private static class SyncCompleteSerializer implements MessageSerializer<SyncComplete>
@@ -79,13 +87,28 @@ public class SyncComplete extends RepairMessage
RepairJobDesc.serializer.serialize(message.desc, out, version);
NodePair.serializer.serialize(message.nodes, out, version);
out.writeBoolean(message.success);
+
+ out.writeInt(message.summaries.size());
+ for (SessionSummary summary: message.summaries)
+ {
+ SessionSummary.serializer.serialize(summary, out, version);
+ }
}
public SyncComplete deserialize(DataInputPlus in, int version) throws IOException
{
RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
NodePair nodes = NodePair.serializer.deserialize(in, version);
- return new SyncComplete(desc, nodes, in.readBoolean());
+ boolean success = in.readBoolean();
+
+ int numSummaries = in.readInt();
+ List<SessionSummary> summaries = new ArrayList<>(numSummaries);
+ for (int i=0; i<numSummaries; i++)
+ {
+ summaries.add(SessionSummary.serializer.deserialize(in, version));
+ }
+
+ return new SyncComplete(desc, nodes, success, summaries);
}
public long serializedSize(SyncComplete message, int version)
@@ -93,6 +116,13 @@ public class SyncComplete extends RepairMessage
long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
size += NodePair.serializer.serializedSize(message.nodes, version);
size += TypeSizes.sizeof(message.success);
+
+ size += TypeSizes.sizeof(message.summaries.size());
+ for (SessionSummary summary: message.summaries)
+ {
+ size += SessionSummary.serializer.serializedSize(summary, version);
+ }
+
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index e31cc6c..01601e2 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.streaming.PreviewKind;
/**
* Body part of SYNC_REQUEST repair message.
@@ -48,14 +49,16 @@ public class SyncRequest extends RepairMessage
public final InetAddress src;
public final InetAddress dst;
public final Collection<Range<Token>> ranges;
+ public final PreviewKind previewKind;
- public SyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges)
+ public SyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges, PreviewKind previewKind)
{
super(Type.SYNC_REQUEST, desc);
this.initiator = initiator;
this.src = src;
this.dst = dst;
this.ranges = ranges;
+ this.previewKind = previewKind;
}
@Override
@@ -69,13 +72,14 @@ public class SyncRequest extends RepairMessage
initiator.equals(req.initiator) &&
src.equals(req.src) &&
dst.equals(req.dst) &&
- ranges.equals(req.ranges);
+ ranges.equals(req.ranges) &&
+ previewKind == req.previewKind;
}
@Override
public int hashCode()
{
- return Objects.hash(messageType, desc, initiator, src, dst, ranges);
+ return Objects.hash(messageType, desc, initiator, src, dst, ranges, previewKind);
}
public static class SyncRequestSerializer implements MessageSerializer<SyncRequest>
@@ -92,6 +96,7 @@ public class SyncRequest extends RepairMessage
MessagingService.validatePartitioner(range);
AbstractBounds.tokenSerializer.serialize(range, out, version);
}
+ out.writeInt(message.previewKind.getSerializationVal());
}
public SyncRequest deserialize(DataInputPlus in, int version) throws IOException
@@ -104,7 +109,8 @@ public class SyncRequest extends RepairMessage
List<Range<Token>> ranges = new ArrayList<>(rangesCount);
for (int i = 0; i < rangesCount; ++i)
ranges.add((Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), version));
- return new SyncRequest(desc, owner, src, dst, ranges);
+ PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
+ return new SyncRequest(desc, owner, src, dst, ranges, previewKind);
}
public long serializedSize(SyncRequest message, int version)
@@ -114,6 +120,7 @@ public class SyncRequest extends RepairMessage
size += TypeSizes.sizeof(message.ranges.size());
for (Range<Token> range : message.ranges)
size += AbstractBounds.tokenSerializer.serializedSize(range, version);
+ size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
return size;
}
}
@@ -126,6 +133,7 @@ public class SyncRequest extends RepairMessage
", src=" + src +
", dst=" + dst +
", ranges=" + ranges +
+ ", previewKind=" + previewKind +
"} " + super.toString();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index fd98b37..aadf7c1 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -61,6 +61,7 @@ import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.RepairSession;
@@ -167,6 +168,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
Set<InetAddress> endpoints,
boolean isConsistent,
boolean pullRepair,
+ PreviewKind previewKind,
ListeningExecutorService executor,
String... cfnames)
{
@@ -176,7 +178,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
if (cfnames.length == 0)
return null;
- final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isConsistent, pullRepair, cfnames);
+ final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isConsistent, pullRepair, previewKind, cfnames);
sessions.put(session.getId(), session);
// register listeners
@@ -319,7 +321,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
{
// we only want repairedAt for incremental repairs, for non incremental repairs, UNREPAIRED_SSTABLE will preserve repairedAt on streamed sstables
long repairedAt = options.isIncremental() ? Clock.instance.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE;
- registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal());
+ registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind());
final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
final AtomicBoolean status = new AtomicBoolean(true);
final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>());
@@ -351,7 +353,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
{
if (FailureDetector.instance.isAlive(neighbour))
{
- PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal());
+ PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind());
MessageOut<RepairMessage> msg = message.createMessage();
MessagingService.instance().sendRR(msg, neighbour, callback, DatabaseDescriptor.getRpcTimeout(), true);
}
@@ -386,7 +388,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
throw new RuntimeException(errorMsg);
}
- public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal)
+ public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind)
{
assert isIncremental || repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE;
if (!registeredForEndpointChanges)
@@ -396,7 +398,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
registeredForEndpointChanges = true;
}
- parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, repairedAt, isGlobal));
+ parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, repairedAt, isGlobal, previewKind));
}
public ParentRepairSession getParentRepairSession(UUID parentSessionId)
@@ -444,7 +446,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
case SYNC_COMPLETE:
// one of replica is synced.
SyncComplete sync = (SyncComplete) message;
- session.syncComplete(desc, sync.nodes, sync.success);
+ session.syncComplete(desc, sync.nodes, sync.success, sync.summaries);
break;
default:
break;
@@ -464,8 +466,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
public final boolean isGlobal;
public final long repairedAt;
public final InetAddress coordinator;
+ public final PreviewKind previewKind;
- public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal)
+ public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind)
{
this.coordinator = coordinator;
for (ColumnFamilyStore cfs : columnFamilyStores)
@@ -476,6 +479,27 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
this.repairedAt = repairedAt;
this.isIncremental = isIncremental;
this.isGlobal = isGlobal;
+ this.previewKind = previewKind;
+ }
+
+ public boolean isPreview()
+ {
+ return previewKind != PreviewKind.NONE;
+ }
+
+ public Predicate<SSTableReader> getPreviewPredicate()
+ {
+ switch (previewKind)
+ {
+ case ALL:
+ return (s) -> true;
+ case REPAIRED:
+ return (s) -> s.isRepaired();
+ case UNREPAIRED:
+ return (s) -> !s.isRepaired();
+ default:
+ throw new RuntimeException("Can't get preview predicate for preview kind " + previewKind);
+ }
}
public synchronized void maybeSnapshot(TableId tableId, UUID parentSessionId)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 86340a5..5f734c9 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -64,10 +64,12 @@ public class ConnectionHandler
private IncomingMessageHandler incoming;
private OutgoingMessageHandler outgoing;
+ private final boolean isPreview;
- ConnectionHandler(StreamSession session, int incomingSocketTimeout)
+ ConnectionHandler(StreamSession session, int incomingSocketTimeout, boolean isPreview)
{
this.session = session;
+ this.isPreview = isPreview;
this.incoming = new IncomingMessageHandler(session, incomingSocketTimeout);
this.outgoing = new OutgoingMessageHandler(session);
}
@@ -142,6 +144,9 @@ public class ConnectionHandler
if (outgoing.isClosed())
throw new RuntimeException("Outgoing stream handler has been closed");
+ if (message.type == StreamMessage.Type.FILE && isPreview)
+ throw new RuntimeException("Cannot send file messages for preview streaming sessions");
+
outgoing.enqueue(message);
}
@@ -191,14 +196,14 @@ public class ConnectionHandler
@SuppressWarnings("resource")
private void sendInitMessage() throws IOException
{
- StreamInitMessage message = new StreamInitMessage(
- FBUtilities.getBroadcastAddress(),
- session.sessionIndex(),
- session.planId(),
- session.streamOperation(),
- !isOutgoingHandler,
- session.keepSSTableLevel(),
- session.getPendingRepair());
+ StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(),
+ session.sessionIndex(),
+ session.planId(),
+ session.streamOperation(),
+ !isOutgoingHandler,
+ session.keepSSTableLevel(),
+ session.getPendingRepair(),
+ session.getPreviewKind());
ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
DataOutputStreamPlus out = getWriteChannel(socket);
out.write(messageBuf);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/PreviewKind.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/PreviewKind.java b/src/java/org/apache/cassandra/streaming/PreviewKind.java
new file mode 100644
index 0000000..3b4d2a0
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/PreviewKind.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+
+import java.util.UUID;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+public enum PreviewKind
+{
+ NONE(0, null),
+ ALL(1, Predicates.alwaysTrue()),
+ UNREPAIRED(2, Predicates.not(SSTableReader::isRepaired)),
+ REPAIRED(3, SSTableReader::isRepaired);
+
+ private final int serializationVal;
+ private final Predicate<SSTableReader> streamingPredicate;
+
+ PreviewKind(int serializationVal, Predicate<SSTableReader> streamingPredicate)
+ {
+ assert ordinal() == serializationVal;
+ this.serializationVal = serializationVal;
+ this.streamingPredicate = streamingPredicate;
+ }
+
+ public int getSerializationVal()
+ {
+ return serializationVal;
+ }
+
+ public static PreviewKind deserialize(int serializationVal)
+ {
+ return values()[serializationVal];
+ }
+
+ public Predicate<SSTableReader> getStreamingPredicate()
+ {
+ return streamingPredicate;
+ }
+
+ public boolean isPreview()
+ {
+ return this != NONE;
+ }
+
+ public String logPrefix()
+ {
+ return isPreview() ? "preview repair" : "repair";
+ }
+
+ public String logPrefix(UUID sessionId)
+ {
+ return '[' + logPrefix() + " #" + sessionId.toString() + ']';
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java
index 3bcb20c..1521614 100644
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -27,6 +27,8 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import org.apache.cassandra.utils.FBUtilities;
+
/**
* Stream session info.
*/
@@ -190,4 +192,9 @@ public final class SessionInfo implements Serializable
});
return Iterables.size(completed);
}
+
+ public SessionSummary createSummary()
+ {
+ return new SessionSummary(FBUtilities.getBroadcastAddress(), peer, receivingSummaries, sendingSummaries);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/SessionSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionSummary.java b/src/java/org/apache/cassandra/streaming/SessionSummary.java
new file mode 100644
index 0000000..d52c2ca
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/SessionSummary.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.serializers.InetAddressSerializer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class SessionSummary
+{
+ public final InetAddress coordinator;
+ public final InetAddress peer;
+ /** Immutable collection of receiving summaries */
+ public final Collection<StreamSummary> receivingSummaries;
+ /** Immutable collection of sending summaries*/
+ public final Collection<StreamSummary> sendingSummaries;
+
+ public SessionSummary(InetAddress coordinator, InetAddress peer,
+ Collection<StreamSummary> receivingSummaries,
+ Collection<StreamSummary> sendingSummaries)
+ {
+ assert coordinator != null;
+ assert peer != null;
+ assert receivingSummaries != null;
+ assert sendingSummaries != null;
+
+ this.coordinator = coordinator;
+ this.peer = peer;
+ this.receivingSummaries = receivingSummaries;
+ this.sendingSummaries = sendingSummaries;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ SessionSummary summary = (SessionSummary) o;
+
+ if (!coordinator.equals(summary.coordinator)) return false;
+ if (!peer.equals(summary.peer)) return false;
+ if (!receivingSummaries.equals(summary.receivingSummaries)) return false;
+ return sendingSummaries.equals(summary.sendingSummaries);
+ }
+
+ public int hashCode()
+ {
+ int result = coordinator.hashCode();
+ result = 31 * result + peer.hashCode();
+ result = 31 * result + receivingSummaries.hashCode();
+ result = 31 * result + sendingSummaries.hashCode();
+ return result;
+ }
+
+ public static IVersionedSerializer<SessionSummary> serializer = new IVersionedSerializer<SessionSummary>()
+ {
+ public void serialize(SessionSummary summary, DataOutputPlus out, int version) throws IOException
+ {
+ ByteBufferUtil.writeWithLength(InetAddressSerializer.instance.serialize(summary.coordinator), out);
+ ByteBufferUtil.writeWithLength(InetAddressSerializer.instance.serialize(summary.peer), out);
+
+ out.writeInt(summary.receivingSummaries.size());
+ for (StreamSummary streamSummary: summary.receivingSummaries)
+ {
+ StreamSummary.serializer.serialize(streamSummary, out, version);
+ }
+
+ out.writeInt(summary.sendingSummaries.size());
+ for (StreamSummary streamSummary: summary.sendingSummaries)
+ {
+ StreamSummary.serializer.serialize(streamSummary, out, version);
+ }
+ }
+
+ public SessionSummary deserialize(DataInputPlus in, int version) throws IOException
+ {
+ InetAddress coordinator = InetAddressSerializer.instance.deserialize(ByteBufferUtil.readWithLength(in));
+ InetAddress peer = InetAddressSerializer.instance.deserialize(ByteBufferUtil.readWithLength(in));
+
+ int numRcvd = in.readInt();
+ List<StreamSummary> receivingSummaries = new ArrayList<>(numRcvd);
+ for (int i=0; i<numRcvd; i++)
+ {
+ receivingSummaries.add(StreamSummary.serializer.deserialize(in, version));
+ }
+
+ int numSent = in.readInt();
+ List<StreamSummary> sendingSummaries = new ArrayList<>(numRcvd);
+ for (int i=0; i<numSent; i++)
+ {
+ sendingSummaries.add(StreamSummary.serializer.deserialize(in, version));
+ }
+
+ return new SessionSummary(coordinator, peer, receivingSummaries, sendingSummaries);
+ }
+
+ public long serializedSize(SessionSummary summary, int version)
+ {
+ long size = 0;
+ size += ByteBufferUtil.serializedSizeWithLength(InetAddressSerializer.instance.serialize(summary.coordinator));
+ size += ByteBufferUtil.serializedSizeWithLength(InetAddressSerializer.instance.serialize(summary.peer));
+
+ size += TypeSizes.sizeof(summary.receivingSummaries.size());
+ for (StreamSummary streamSummary: summary.receivingSummaries)
+ {
+ size += StreamSummary.serializer.serializedSize(streamSummary, version);
+ }
+ size += TypeSizes.sizeof(summary.sendingSummaries.size());
+ for (StreamSummary streamSummary: summary.sendingSummaries)
+ {
+ size += StreamSummary.serializer.serializedSize(streamSummary, version);
+ }
+ return size;
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 6aa34cd..9059f45 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -49,15 +49,17 @@ public class StreamCoordinator
private final boolean keepSSTableLevel;
private Iterator<StreamSession> sessionsToConnect = null;
private final UUID pendingRepair;
+ private final PreviewKind previewKind;
public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, StreamConnectionFactory factory,
- boolean connectSequentially, UUID pendingRepair)
+ boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind)
{
this.connectionsPerHost = connectionsPerHost;
this.factory = factory;
this.keepSSTableLevel = keepSSTableLevel;
this.connectSequentially = connectSequentially;
this.pendingRepair = pendingRepair;
+ this.previewKind = previewKind;
}
public void setConnectionFactory(StreamConnectionFactory factory)
@@ -293,7 +295,7 @@ public class StreamCoordinator
// create
if (streamSessions.size() < connectionsPerHost)
{
- StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, pendingRepair);
+ StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, pendingRepair, previewKind);
streamSessions.put(++lastReturned, session);
return session;
}
@@ -325,7 +327,7 @@ public class StreamCoordinator
StreamSession session = streamSessions.get(id);
if (session == null)
{
- session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, pendingRepair);
+ session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, pendingRepair, previewKind);
streamSessions.put(id, session);
}
return session;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index b5a6214..05a8d30 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -22,9 +22,10 @@ import java.util.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.UUIDGen;
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+
/**
* {@link StreamPlan} is a helper class that builds StreamOperation of given configuration.
*
@@ -47,20 +48,20 @@ public class StreamPlan
*/
public StreamPlan(StreamOperation streamOperation)
{
- this(streamOperation, 1, false, false, null);
+ this(streamOperation, 1, false, false, NO_PENDING_REPAIR, PreviewKind.NONE);
}
public StreamPlan(StreamOperation streamOperation, boolean keepSSTableLevels, boolean connectSequentially)
{
- this(streamOperation, 1, keepSSTableLevels, connectSequentially, null);
+ this(streamOperation, 1, keepSSTableLevels, connectSequentially, NO_PENDING_REPAIR, PreviewKind.NONE);
}
public StreamPlan(StreamOperation streamOperation, int connectionsPerHost, boolean keepSSTableLevels,
- boolean connectSequentially, UUID pendingRepair)
+ boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind)
{
this.streamOperation = streamOperation;
this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, new DefaultConnectionFactory(),
- connectSequentially, pendingRepair);
+ connectSequentially, pendingRepair, previewKind);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index b7e475a..34e7cc8 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,6 +93,8 @@ public class StreamReceiveTask extends StreamTask
*/
public synchronized void received(SSTableMultiWriter sstable)
{
+ Preconditions.checkState(!session.isPreview(), "we should never receive sstables when previewing");
+
if (done)
{
logger.warn("[{}] Received sstable {} on already finished stream received task. Aborting sstable.", session.planId(),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 7845986..67d7d0d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -71,9 +71,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
set(getCurrentState());
}
- private StreamResultFuture(UUID planId, StreamOperation streamOperation, boolean keepSSTableLevels, UUID pendingRepair)
+ private StreamResultFuture(UUID planId, StreamOperation streamOperation, boolean keepSSTableLevels, UUID pendingRepair, PreviewKind previewKind)
{
- this(planId, streamOperation, new StreamCoordinator(0, keepSSTableLevels, new DefaultConnectionFactory(), false, pendingRepair));
+ this(planId, streamOperation, new StreamCoordinator(0, keepSSTableLevels, new DefaultConnectionFactory(), false, pendingRepair, previewKind));
}
static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners,
@@ -107,7 +107,8 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
boolean isForOutgoing,
int version,
boolean keepSSTableLevel,
- UUID pendingRepair) throws IOException
+ UUID pendingRepair,
+ PreviewKind previewKind) throws IOException
{
StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
if (future == null)
@@ -115,7 +116,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, streamOperation.getDescription());
// The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
- future = new StreamResultFuture(planId, streamOperation, keepSSTableLevel, pendingRepair);
+ future = new StreamResultFuture(planId, streamOperation, keepSSTableLevel, pendingRepair, previewKind);
StreamManager.instance.registerReceiving(future);
}
future.attachConnection(from, sessionIndex, connection, isForOutgoing, version);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index adb8e79..5ca9938 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -165,6 +165,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
private final boolean keepSSTableLevel;
private ScheduledFuture<?> keepAliveFuture = null;
private final UUID pendingRepair;
+ private final PreviewKind previewKind;
public static enum State
{
@@ -181,12 +182,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
/**
* Create new streaming session with the peer.
- *
- * @param peer Address of streaming peer
+ * @param peer Address of streaming peer
* @param connecting Actual connecting address
* @param factory is used for establishing connection
*/
- public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair)
+ public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
{
this.peer = peer;
this.connecting = connecting;
@@ -194,10 +194,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
this.factory = factory;
this.handler = new ConnectionHandler(this, isKeepAliveSupported()?
(int)TimeUnit.SECONDS.toMillis(2 * DatabaseDescriptor.getStreamingKeepAlivePeriod()) :
- DatabaseDescriptor.getStreamingSocketTimeout());
+ DatabaseDescriptor.getStreamingSocketTimeout(), previewKind.isPreview());
this.metrics = StreamingMetrics.get(connecting);
this.keepSSTableLevel = keepSSTableLevel;
this.pendingRepair = pendingRepair;
+ this.previewKind = previewKind;
}
public UUID planId()
@@ -225,6 +226,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
return pendingRepair;
}
+ public boolean isPreview()
+ {
+ return previewKind.isPreview();
+ }
+
+ public PreviewKind getPreviewKind()
+ {
+ return previewKind;
+ }
+
public LifecycleTransaction getTransaction(TableId tableId)
{
assert receivers.containsKey(tableId);
@@ -314,7 +325,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
flushSSTables(stores);
List<Range<Token>> normalizedRanges = Range.normalize(ranges);
- List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, pendingRepair);
+ List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, pendingRepair, previewKind);
try
{
addTransferFiles(sections);
@@ -356,7 +367,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
}
@VisibleForTesting
- public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, UUID pendingRepair)
+ public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, UUID pendingRepair, PreviewKind previewKind)
{
Refs<SSTableReader> refs = new Refs<>();
try
@@ -370,7 +381,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
Set<SSTableReader> sstables = Sets.newHashSet();
SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL));
Predicate<SSTableReader> predicate;
- if (pendingRepair == ActiveRepairService.NO_PENDING_REPAIR)
+ if (previewKind.isPreview())
+ {
+ predicate = previewKind.getStreamingPredicate();
+ }
+ else if (pendingRepair == ActiveRepairService.NO_PENDING_REPAIR)
{
predicate = Predicates.alwaysTrue();
}
@@ -620,6 +635,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber
handler.sendMessage(prepare);
}
+ if (isPreview())
+ {
+ completePreview();
+ return;
+ }
+
// if there are files to stream
if (!maybeCompleted())
startStreamingFiles();
@@ -650,6 +671,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
*/
public void receive(IncomingFileMessage message)
{
+ if (isPreview())
+ {
+ throw new RuntimeException("Cannot receive files for preview session");
+ }
+
long headerSize = message.header.size();
StreamingMetrics.totalIncomingBytes.inc(headerSize);
metrics.incomingBytes.inc(headerSize);
@@ -753,6 +779,22 @@ public class StreamSession implements IEndpointStateChangeSubscriber
closeSession(State.FAILED);
}
+ private void completePreview()
+ {
+ try
+ {
+ state(State.WAIT_COMPLETE);
+ closeSession(State.COMPLETE);
+ }
+ finally
+ {
+ // aborting the tasks here needs to be the last thing we do so that we
+ // accurately report expected streaming, but don't leak any sstable refs
+ for (StreamTask task : Iterables.concat(receivers.values(), transfers.values()))
+ task.abort();
+ }
+ }
+
private boolean maybeCompleted()
{
boolean completed = receivers.isEmpty() && transfers.isEmpty();
@@ -803,6 +845,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
streamResult.handleSessionPrepared(this);
state(State.STREAMING);
+
for (StreamTransferTask task : transfers.values())
{
Collection<OutgoingFileMessage> messages = task.getFileMessages();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamState.java b/src/java/org/apache/cassandra/streaming/StreamState.java
index 4ee3c8d..be37677 100644
--- a/src/java/org/apache/cassandra/streaming/StreamState.java
+++ b/src/java/org/apache/cassandra/streaming/StreamState.java
@@ -18,11 +18,13 @@
package org.apache.cassandra.streaming;
import java.io.Serializable;
+import java.util.List;
import java.util.Set;
import java.util.UUID;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
/**
* Current snapshot of streaming progress.
@@ -50,4 +52,9 @@ public class StreamState implements Serializable
}
});
}
+
+ public List<SessionSummary> createSummaries()
+ {
+ return Lists.newArrayList(Iterables.transform(sessions, SessionInfo::createSummary));
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 4619561..ceaa4d1 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.UUIDSerializer;
/**
@@ -50,8 +51,9 @@ public class StreamInitMessage
public final boolean isForOutgoing;
public final boolean keepSSTableLevel;
public final UUID pendingRepair;
+ public final PreviewKind previewKind;
- public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean isForOutgoing, boolean keepSSTableLevel, UUID pendingRepair)
+ public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean isForOutgoing, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
{
this.from = from;
this.sessionIndex = sessionIndex;
@@ -60,6 +62,7 @@ public class StreamInitMessage
this.isForOutgoing = isForOutgoing;
this.keepSSTableLevel = keepSSTableLevel;
this.pendingRepair = pendingRepair;
+ this.previewKind = previewKind;
}
/**
@@ -120,6 +123,7 @@ public class StreamInitMessage
{
UUIDSerializer.serializer.serialize(message.pendingRepair, out, MessagingService.current_version);
}
+ out.writeInt(message.previewKind.getSerializationVal());
}
public StreamInitMessage deserialize(DataInputPlus in, int version) throws IOException
@@ -132,7 +136,8 @@ public class StreamInitMessage
boolean keepSSTableLevel = in.readBoolean();
UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null;
- return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), sentByInitiator, keepSSTableLevel, pendingRepair);
+ PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
+ return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), sentByInitiator, keepSSTableLevel, pendingRepair, previewKind);
}
public long serializedSize(StreamInitMessage message, int version)
@@ -148,6 +153,7 @@ public class StreamInitMessage
{
size += UUIDSerializer.serializer.serializedSize(message.pendingRepair, MessagingService.current_version);
}
+ size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
return size;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/tools/nodetool/Repair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
index 48f929f..317a677 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
@@ -32,6 +32,7 @@ import java.util.Set;
import com.google.common.collect.Sets;
import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.tools.NodeProbe;
@@ -73,6 +74,12 @@ public class Repair extends NodeToolCmd
@Option(title = "full", name = {"-full", "--full"}, description = "Use -full to issue a full repair.")
private boolean fullRepair = false;
+ @Option(title = "preview", name = {"-prv", "--preview"}, description = "Determine ranges and amount of data to be streamed, but don't actually perform repair")
+ private boolean preview = false;
+
+ @Option(title = "validate", name = {"-vd", "--validate"}, description = "Checks that repaired data is in sync between nodes. Out of sync repaired data indicates a full repair should be run.")
+ private boolean validate = false;
+
@Option(title = "job_threads", name = {"-j", "--job-threads"}, description = "Number of threads to run repair jobs. " +
"Usually this means number of CFs to repair concurrently. " +
"WARNING: increasing this puts more load on repairing nodes, so be careful. (default: 1, max: 4)")
@@ -84,6 +91,26 @@ public class Repair extends NodeToolCmd
@Option(title = "pull_repair", name = {"-pl", "--pull"}, description = "Use --pull to perform a one way repair where data is only streamed from a remote node to this node.")
private boolean pullRepair = false;
+ private PreviewKind getPreviewKind()
+ {
+ if (validate)
+ {
+ return PreviewKind.REPAIRED;
+ }
+ else if (preview && fullRepair)
+ {
+ return PreviewKind.ALL;
+ }
+ else if (preview)
+ {
+ return PreviewKind.UNREPAIRED;
+ }
+ else
+ {
+ return PreviewKind.NONE;
+ }
+ }
+
@Override
public void execute(NodeProbe probe)
{
@@ -112,6 +139,8 @@ public class Repair extends NodeToolCmd
options.put(RepairOption.TRACE_KEY, Boolean.toString(trace));
options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(cfnames, ","));
options.put(RepairOption.PULL_REPAIR_KEY, Boolean.toString(pullRepair));
+ options.put(RepairOption.PREVIEW, getPreviewKind().toString());
+
if (!startToken.isEmpty() || !endToken.isEmpty())
{
options.put(RepairOption.RANGES_KEY, startToken + ":" + endToken);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/gms.EndpointState.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/gms.EndpointState.bin b/test/data/serialization/4.0/gms.EndpointState.bin
new file mode 100644
index 0000000..fb7d168
Binary files /dev/null and b/test/data/serialization/4.0/gms.EndpointState.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/gms.Gossip.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/gms.Gossip.bin b/test/data/serialization/4.0/gms.Gossip.bin
new file mode 100644
index 0000000..af5ac57
Binary files /dev/null and b/test/data/serialization/4.0/gms.Gossip.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/service.SyncComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.SyncComplete.bin b/test/data/serialization/4.0/service.SyncComplete.bin
new file mode 100644
index 0000000..ba84349
Binary files /dev/null and b/test/data/serialization/4.0/service.SyncComplete.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/service.SyncRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.SyncRequest.bin b/test/data/serialization/4.0/service.SyncRequest.bin
new file mode 100644
index 0000000..6d688a4
Binary files /dev/null and b/test/data/serialization/4.0/service.SyncRequest.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/service.ValidationComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.ValidationComplete.bin b/test/data/serialization/4.0/service.ValidationComplete.bin
new file mode 100644
index 0000000..7433d64
Binary files /dev/null and b/test/data/serialization/4.0/service.ValidationComplete.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/service.ValidationRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.ValidationRequest.bin b/test/data/serialization/4.0/service.ValidationRequest.bin
new file mode 100644
index 0000000..a00763b
Binary files /dev/null and b/test/data/serialization/4.0/service.ValidationRequest.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/utils.EstimatedHistogram.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/utils.EstimatedHistogram.bin b/test/data/serialization/4.0/utils.EstimatedHistogram.bin
new file mode 100644
index 0000000..e878eda
Binary files /dev/null and b/test/data/serialization/4.0/utils.EstimatedHistogram.bin differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index 04cb083..3611f0e 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -36,10 +36,11 @@ import java.util.Map;
public class AbstractSerializationsTester
{
- protected static final String CUR_VER = System.getProperty("cassandra.version", "3.0");
+ protected static final String CUR_VER = System.getProperty("cassandra.version", "4.0");
protected static final Map<String, Integer> VERSION_MAP = new HashMap<String, Integer> ()
{{
put("3.0", MessagingService.VERSION_30);
+ put("4.0", MessagingService.VERSION_40);
}};
protected static final boolean EXECUTE_WRITES = Boolean.getBoolean("cassandra.test-serialization-writes");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
index 0ee85c6..b9e3c17 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.Validator;
import org.apache.cassandra.schema.KeyspaceParams;
@@ -106,7 +107,8 @@ public class CompactionManagerGetSSTablesForValidationTest
Sets.newHashSet(range),
incremental,
incremental ? System.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE,
- true);
+ true,
+ PreviewKind.NONE);
desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, Collections.singleton(range));
}
@@ -135,7 +137,7 @@ public class CompactionManagerGetSSTablesForValidationTest
modifySSTables();
// get sstables for repair
- Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), true);
+ Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), true, PreviewKind.NONE);
Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator));
Assert.assertNotNull(sstables);
Assert.assertEquals(1, sstables.size());
@@ -150,7 +152,7 @@ public class CompactionManagerGetSSTablesForValidationTest
modifySSTables();
// get sstables for repair
- Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false);
+ Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false, PreviewKind.NONE);
Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator));
Assert.assertNotNull(sstables);
Assert.assertEquals(2, sstables.size());
@@ -166,7 +168,7 @@ public class CompactionManagerGetSSTablesForValidationTest
modifySSTables();
// get sstables for repair
- Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false);
+ Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false, PreviewKind.NONE);
Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator));
Assert.assertNotNull(sstables);
Assert.assertEquals(3, sstables.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 0a38cd9..360a2cd 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.notifications.SSTableAddedNotification;
import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.Validator;
import org.apache.cassandra.schema.CompactionParams;
@@ -193,9 +194,16 @@ public class LeveledCompactionStrategyTest
Range<Token> range = new Range<>(Util.token(""), Util.token(""));
int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds());
UUID parentRepSession = UUID.randomUUID();
- ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, ActiveRepairService.UNREPAIRED_SSTABLE, true);
+ ActiveRepairService.instance.registerParentRepairSession(parentRepSession,
+ FBUtilities.getBroadcastAddress(),
+ Arrays.asList(cfs),
+ Arrays.asList(range),
+ false,
+ ActiveRepairService.UNREPAIRED_SSTABLE,
+ true,
+ PreviewKind.NONE);
RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range));
- Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);
+ Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore, PreviewKind.NONE);
CompactionManager.instance.submitValidation(cfs, validator).get();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
index f11362f..b5f8036 100644
--- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
+++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.FBUtilities;
@@ -50,7 +51,7 @@ public class StreamStateStoreTest
Range<Token> range = new Range<>(factory.fromString("0"), factory.fromString("100"));
InetAddress local = FBUtilities.getBroadcastAddress();
- StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null);
+ StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null, PreviewKind.NONE);
session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf"));
StreamStateStore store = new StreamStateStore();
@@ -71,7 +72,7 @@ public class StreamStateStoreTest
// add different range within the same keyspace
Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200"));
- session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null);
+ session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null, PreviewKind.NONE);
session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf"));
session.state(StreamSession.State.COMPLETE);
store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 97bd321..53f5ab3 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -789,7 +790,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = StreamSession.getSSTableSectionsForRanges(
Collections.singleton(new Range<Token>(firstToken, firstToken)),
- Collections.singleton(cfs), null);
+ Collections.singleton(cfs), null, PreviewKind.NONE);
assertEquals(1, sectionsBeforeRewrite.size());
for (StreamSession.SSTableStreamingSections section : sectionsBeforeRewrite)
section.ref.release();
@@ -804,7 +805,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
while (!done.get())
{
Set<Range<Token>> range = Collections.singleton(new Range<Token>(firstToken, firstToken));
- List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), null);
+ List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), null, PreviewKind.NONE);
if (sections.size() != 1)
failed.set(true);
for (StreamSession.SSTableStreamingSections section : sections)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
index 1c508a0..d61d859 100644
--- a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
+++ b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.UUIDGen;
@@ -85,7 +86,8 @@ public abstract class AbstractRepairTest
Sets.newHashSet(RANGE1, RANGE2, RANGE3),
isIncremental,
repairedAt,
- isGlobal);
+ isGlobal,
+ PreviewKind.NONE);
return sessionId;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 75742dc..f5e9d6b 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.MerkleTrees;
@@ -91,7 +92,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
// note: we reuse the same endpoint which is bogus in theory but fine here
TreeResponse r1 = new TreeResponse(ep1, tree1);
TreeResponse r2 = new TreeResponse(ep2, tree2);
- LocalSyncTask task = new LocalSyncTask(desc, r1, r2, null, false);
+ LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE);
task.run();
assertEquals(0, task.get().numberOfDifferences);
@@ -105,9 +106,10 @@ public class LocalSyncTaskTest extends AbstractRepairTest
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
- ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(),
+ ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(),
Arrays.asList(cfs), Arrays.asList(range), false,
- ActiveRepairService.UNREPAIRED_SSTABLE, false);
+ ActiveRepairService.UNREPAIRED_SSTABLE, false,
+ PreviewKind.NONE);
RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
@@ -128,7 +130,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
// note: we reuse the same endpoint which is bogus in theory but fine here
TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1);
TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2);
- LocalSyncTask task = new LocalSyncTask(desc, r1, r2, null, false);
+ LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE);
task.run();
// ensure that the changed range was recorded
@@ -145,7 +147,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
- LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false);
+ LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE);
StreamPlan plan = task.createStreamPlan(PARTICIPANT1, PARTICIPANT2, Lists.newArrayList(RANGE1));
assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair());
@@ -162,7 +164,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
- LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, false);
+ LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, false, PreviewKind.NONE);
StreamPlan plan = task.createStreamPlan(PARTICIPANT1, PARTICIPANT2, Lists.newArrayList(RANGE1));
assertEquals(desc.parentSessionId, plan.getPendingRepair());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index 0260cd0..5a4e5b1 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.UUIDGen;
@@ -62,7 +63,10 @@ public class RepairSessionTest
IPartitioner p = Murmur3Partitioner.instance;
Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100)));
Set<InetAddress> endpoints = Sets.newHashSet(remote);
- RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, false, false, "Standard1");
+ RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange),
+ "Keyspace1", RepairParallelism.SEQUENTIAL,
+ endpoints, false, false,
+ PreviewKind.NONE, "Standard1");
// perform convict
session.convict(remote, Double.MAX_VALUE);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
index 5f13e3d..f433f2e 100644
--- a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.utils.UUIDGen;
@@ -64,8 +65,8 @@ public class StreamingRepairTaskTest extends AbstractRepairTest
UUID sessionID = registerSession(cfs, true, true);
ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges());
- SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges());
- StreamingRepairTask task = new StreamingRepairTask(desc, request, desc.sessionId);
+ SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE);
+ StreamingRepairTask task = new StreamingRepairTask(desc, request, desc.sessionId, PreviewKind.NONE);
StreamPlan plan = task.createStreamPlan(request.src, request.dst);
Assert.assertFalse(plan.getFlushBeforeTransfer());
@@ -77,8 +78,8 @@ public class StreamingRepairTaskTest extends AbstractRepairTest
UUID sessionID = registerSession(cfs, false, true);
ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges());
- SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges());
- StreamingRepairTask task = new StreamingRepairTask(desc, request, null);
+ SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE);
+ StreamingRepairTask task = new StreamingRepairTask(desc, request, null, PreviewKind.NONE);
StreamPlan plan = task.createStreamPlan(request.src, request.dst);
Assert.assertTrue(plan.getFlushBeforeTransfer());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index bbcdbb8..b45edc1 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.ValidationComplete;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.MerkleTrees;
@@ -98,7 +99,7 @@ public class ValidatorTest
ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
- Validator validator = new Validator(desc, remote, 0);
+ Validator validator = new Validator(desc, remote, 0, PreviewKind.NONE);
MerkleTrees tree = new MerkleTrees(partitioner);
tree.addMerkleTrees((int) Math.pow(2, 15), validator.desc.ranges);
validator.prepare(cfs, tree);
@@ -135,7 +136,7 @@ public class ValidatorTest
InetAddress remote = InetAddress.getByName("127.0.0.2");
- Validator validator = new Validator(desc, remote, 0);
+ Validator validator = new Validator(desc, remote, 0, PreviewKind.NONE);
validator.fail();
MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
@@ -190,10 +191,10 @@ public class ValidatorTest
ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
- false);
+ false, PreviewKind.NONE);
final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
- Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true, false);
+ Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true, false, PreviewKind.NONE);
CompactionManager.instance.submitValidation(cfs, validator);
MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
index 26168ad..367fea9 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.UUIDGen;
@@ -85,7 +86,8 @@ public abstract class AbstractConsistentSessionTest
Sets.newHashSet(RANGE1, RANGE2, RANGE3),
true,
System.currentTimeMillis(),
- true);
+ true,
+ PreviewKind.NONE);
return sessionId;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
index 2cb6326..2126835 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -146,7 +147,7 @@ public class PendingAntiCompactionTest
// create a session so the anti compaction can fine it
UUID sessionID = UUIDGen.getTimeUUID();
- ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddress.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, true);
+ ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddress.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, true, PreviewKind.NONE);
PendingAntiCompaction pac;
ExecutorService executor = Executors.newSingleThreadExecutor();