You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:35:59 UTC
[07/18] cassandra git commit: Transient Replication and Cheap Quorums
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
index a43e3eb..4af4a92 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.service.reads.repair;
-import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
@@ -27,24 +26,28 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.service.reads.DigestResolver;
/**
* Bypasses the read repair path for short read protection and testing
*/
-public class NoopReadRepair implements ReadRepair
+public class NoopReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L>
{
public static final NoopReadRepair instance = new NoopReadRepair();
private NoopReadRepair() {}
- public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints)
+ @Override
+ public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicas)
{
return UnfilteredPartitionIterators.MergeListener.NOOP;
}
- public void startRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
+ @Override
+ public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer)
{
resultConsumer.accept(digestResolver.getData());
}
@@ -72,7 +75,7 @@ public class NoopReadRepair implements ReadRepair
}
@Override
- public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations)
+ public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
{
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
index 6cf761a..4cae3ae 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
@@ -28,18 +28,18 @@ import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaLayout;
public class PartitionIteratorMergeListener implements UnfilteredPartitionIterators.MergeListener
{
- private final InetAddressAndPort[] sources;
+ private final ReplicaLayout replicaLayout;
private final ReadCommand command;
private final ConsistencyLevel consistency;
private final ReadRepair readRepair;
- public PartitionIteratorMergeListener(InetAddressAndPort[] sources, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
+ public PartitionIteratorMergeListener(ReplicaLayout replicaLayout, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
{
- this.sources = sources;
+ this.replicaLayout = replicaLayout;
this.command = command;
this.consistency = consistency;
this.readRepair = readRepair;
@@ -47,10 +47,10 @@ public class PartitionIteratorMergeListener implements UnfilteredPartitionIterat
public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
{
- return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), sources, command, consistency, readRepair);
+ return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), replicaLayout, command, consistency, readRepair);
}
- private RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions)
+ protected RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions)
{
Columns statics = Columns.NONE;
Columns regulars = Columns.NONE;
@@ -66,7 +66,7 @@ public class PartitionIteratorMergeListener implements UnfilteredPartitionIterat
return new RegularAndStaticColumns(statics, regulars);
}
- private boolean isReversed(List<UnfilteredRowIterator> versions)
+ protected boolean isReversed(List<UnfilteredRowIterator> versions)
{
for (UnfilteredRowIterator iter : versions)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
index d994b23..c13e2d6 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
@@ -21,27 +21,28 @@ package org.apache.cassandra.service.reads.repair;
import java.util.Map;
import com.codahale.metrics.Meter;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.metrics.ReadRepairMetrics;
/**
* Only performs the collection of data responses and reconciliation of them, doesn't send repair mutations
* to replicas. This preserves write atomicity, but doesn't provide monotonic quorum reads
*/
-public class ReadOnlyReadRepair extends AbstractReadRepair
+public class ReadOnlyReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractReadRepair<E, L>
{
- public ReadOnlyReadRepair(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency)
+ ReadOnlyReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime)
{
- super(command, queryStartNanoTime, consistency);
+ super(command, replicaLayout, queryStartNanoTime);
}
@Override
- public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints)
+ public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout)
{
return UnfilteredPartitionIterators.MergeListener.NOOP;
}
@@ -59,7 +60,7 @@ public class ReadOnlyReadRepair extends AbstractReadRepair
}
@Override
- public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations)
+ public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
{
throw new UnsupportedOperationException("ReadOnlyReadRepair shouldn't be trying to repair partitions");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
index 97f0f67..168f003 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
@@ -17,44 +17,45 @@
*/
package org.apache.cassandra.service.reads.repair;
-import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.locator.Endpoints;
+
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.service.reads.DigestResolver;
-public interface ReadRepair
+public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
{
public interface Factory
{
- ReadRepair create(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency);
+ <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime);
+ }
+
+ static <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaPlan, long queryStartNanoTime)
+ {
+ return command.metadata().params.readRepair.create(command, replicaPlan, queryStartNanoTime);
}
/**
* Used by DataResolver to generate corrections as the partition iterator is consumed
*/
- UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints);
+ UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout);
/**
* Called when the digests from the initial read don't match. Reads may block on the
* repair started by this method.
* @param digestResolver supplied so we can get the original data response
- * @param allEndpoints all available replicas for this read
- * @param contactedEndpoints the replicas we actually sent requests to
* @param resultConsumer hook for the repair to set it's result on completion
*/
- public void startRepair(DigestResolver digestResolver,
- List<InetAddressAndPort> allEndpoints,
- List<InetAddressAndPort> contactedEndpoints,
- Consumer<PartitionIterator> resultConsumer);
+ public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer);
/**
* Block on the reads (or timeout) sent out in {@link ReadRepair#startRepair}
@@ -81,17 +82,13 @@ public interface ReadRepair
public void maybeSendAdditionalWrites();
/**
- * Hook for the merge listener to start repairs on individual partitions.
- */
- void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations);
-
- /**
* Block on any mutations (or timeout) we sent out to repair replicas in {@link ReadRepair#repairPartition}
*/
public void awaitWrites();
- static ReadRepair create(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency)
- {
- return command.metadata().params.readRepair.create(command, queryStartNanoTime, consistency);
- }
+ /**
+ * Repairs a partition _after_ receiving data responses. This method receives replica list, since
+ * we will block repair only on the replicas that have responded.
+ */
+ void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
index 1117822..6eff395 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.service.reads.repair;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -38,8 +39,8 @@ final class ReadRepairDiagnostics
{
}
- static void startRepair(AbstractReadRepair readRepair, List<InetAddressAndPort> endpointDestinations,
- DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints)
+ static void startRepair(AbstractReadRepair readRepair, Collection<InetAddressAndPort> endpointDestinations,
+ DigestResolver digestResolver, Collection<InetAddressAndPort> allEndpoints)
{
if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.START_REPAIR))
service.publish(new ReadRepairEvent(ReadRepairEventType.START_REPAIR,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
index 152f7e6..9e14362 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.service.reads.repair;
import java.io.Serializable;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -48,9 +49,9 @@ final class ReadRepairEvent extends DiagnosticEvent
private final ConsistencyLevel consistency;
private final SpeculativeRetryPolicy.Kind speculativeRetry;
@VisibleForTesting
- final List<InetAddressAndPort> destinations;
+ final Collection<InetAddressAndPort> destinations;
@VisibleForTesting
- final List<InetAddressAndPort> allEndpoints;
+ final Collection<InetAddressAndPort> allEndpoints;
@Nullable
private final DigestResolverDebugResult[] digestsByEndpoint;
@@ -60,13 +61,13 @@ final class ReadRepairEvent extends DiagnosticEvent
SPECULATED_READ
}
- ReadRepairEvent(ReadRepairEventType type, AbstractReadRepair readRepair, List<InetAddressAndPort> destinations,
- List<InetAddressAndPort> allEndpoints, DigestResolver digestResolver)
+ ReadRepairEvent(ReadRepairEventType type, AbstractReadRepair readRepair, Collection<InetAddressAndPort> destinations,
+ Collection<InetAddressAndPort> allEndpoints, DigestResolver digestResolver)
{
this.keyspace = readRepair.cfs.keyspace;
this.tableName = readRepair.cfs.getTableName();
this.cqlCommand = readRepair.command.toCQLString();
- this.consistency = readRepair.consistency;
+ this.consistency = readRepair.replicaLayout.consistencyLevel();
this.speculativeRetry = readRepair.cfs.metadata().params.speculativeRetry.kind();
this.destinations = destinations;
this.allEndpoints = allEndpoints;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
index 5945633..28c0e9e 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
@@ -18,26 +18,25 @@
package org.apache.cassandra.service.reads.repair;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaLayout;
public enum ReadRepairStrategy implements ReadRepair.Factory
{
NONE
{
- @Override
- public ReadRepair create(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency)
+ public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime)
{
- return new ReadOnlyReadRepair(command, queryStartNanoTime, consistency);
+ return new ReadOnlyReadRepair<>(command, replicaLayout, queryStartNanoTime);
}
},
BLOCKING
{
- @Override
- public ReadRepair create(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency)
+ public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime)
{
- return new BlockingReadRepair(command, queryStartNanoTime, consistency);
+ return new BlockingReadRepair<>(command, replicaLayout, queryStartNanoTime);
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index cb6707d..b0c019a 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.reads.repair;
import java.util.Arrays;
import java.util.Map;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.cassandra.db.Clustering;
@@ -43,7 +44,9 @@ import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowDiffListener;
import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.schema.ColumnMetadata;
public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeListener
@@ -51,14 +54,14 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
private final DecoratedKey partitionKey;
private final RegularAndStaticColumns columns;
private final boolean isReversed;
- private final InetAddressAndPort[] sources;
private final ReadCommand command;
private final ConsistencyLevel consistency;
private final PartitionUpdate.Builder[] repairs;
-
+ private final Replica[] sources;
private final Row.Builder[] currentRows;
private final RowDiffListener diffListener;
+ private final ReplicaLayout layout;
// The partition level deletion for the merge row.
private DeletionTime partitionLevelDeletion;
@@ -71,16 +74,21 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
private final ReadRepair readRepair;
- public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, InetAddressAndPort[] sources, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
+ public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaLayout layout, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
{
this.partitionKey = partitionKey;
this.columns = columns;
this.isReversed = isReversed;
- this.sources = sources;
- repairs = new PartitionUpdate.Builder[sources.length];
- currentRows = new Row.Builder[sources.length];
- sourceDeletionTime = new DeletionTime[sources.length];
- markerToRepair = new ClusteringBound[sources.length];
+ Endpoints<?> sources = layout.selected();
+ this.sources = new Replica[sources.size()];
+ for (int i = 0; i < sources.size(); i++)
+ this.sources[i] = sources.get(i);
+
+ this.layout = layout;
+ repairs = new PartitionUpdate.Builder[sources.size()];
+ currentRows = new Row.Builder[sources.size()];
+ sourceDeletionTime = new DeletionTime[sources.size()];
+ markerToRepair = new ClusteringBound[sources.size()];
this.command = command;
this.consistency = consistency;
this.readRepair = readRepair;
@@ -89,25 +97,25 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
{
public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
{
- if (merged != null && !merged.equals(original))
+ if (merged != null && !merged.equals(original) && !isTransient(i))
currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged);
}
public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
{
- if (merged != null && !merged.equals(original))
+ if (merged != null && !merged.equals(original) && !isTransient(i))
currentRow(i, clustering).addRowDeletion(merged);
}
public void onComplexDeletion(int i, Clustering clustering, ColumnMetadata column, DeletionTime merged, DeletionTime original)
{
- if (merged != null && !merged.equals(original))
+ if (merged != null && !merged.equals(original) && !isTransient(i))
currentRow(i, clustering).addComplexDeletion(column, merged);
}
public void onCell(int i, Clustering clustering, Cell merged, Cell original)
{
- if (merged != null && !merged.equals(original) && isQueried(merged))
+ if (merged != null && !merged.equals(original) && isQueried(merged) && !isTransient(i))
currentRow(i, clustering).addCell(merged);
}
@@ -126,6 +134,11 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
};
}
+ private boolean isTransient(int i)
+ {
+ return sources[i].isTransient();
+ }
+
private PartitionUpdate.Builder update(int i)
{
if (repairs[i] == null)
@@ -159,6 +172,9 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
this.partitionLevelDeletion = mergedDeletion;
for (int i = 0; i < versions.length; i++)
{
+ if (isTransient(i))
+ continue;
+
if (mergedDeletion.supersedes(versions[i]))
update(i).addPartitionDeletion(mergedDeletion);
}
@@ -193,6 +209,9 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
for (int i = 0; i < versions.length; i++)
{
+ if (isTransient(i))
+ continue;
+
RangeTombstoneMarker marker = versions[i];
// Update what the source now thinks is the current deletion
@@ -245,12 +264,12 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
if (!marker.isBoundary() && marker.isOpen(isReversed)) // (1)
{
assert currentDeletion.equals(marker.openDeletionTime(isReversed))
- : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
+ : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
}
else // (2)
{
assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed))
- : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
+ : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
}
// and so unless it's a boundary whose opening deletion time is still equal to the current
@@ -306,13 +325,14 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
public void close()
{
- Map<InetAddressAndPort, Mutation> mutations = null;
+ Map<Replica, Mutation> mutations = null;
for (int i = 0; i < repairs.length; i++)
{
if (repairs[i] == null)
continue;
- Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, sources[i], false);
+ Preconditions.checkState(!isTransient(i), "cannot read repair transient replicas");
+ Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, sources[i].endpoint(), false);
if (mutation == null)
continue;
@@ -324,7 +344,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
if (mutations != null)
{
- readRepair.repairPartition(partitionKey, mutations, sources);
+ readRepair.repairPartition(partitionKey, mutations, layout);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
index 609d2a0..38c25dc 100644
--- a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
+++ b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,8 +44,10 @@ public class DefaultConnectionFactory implements StreamConnectionFactory
private static final int DEFAULT_CHANNEL_BUFFER_SIZE = 1 << 22;
- private static final long MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(30);
- private static final int MAX_CONNECT_ATTEMPTS = 3;
+ @VisibleForTesting
+ public static long MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(30);
+ @VisibleForTesting
+ public static int MAX_CONNECT_ATTEMPTS = 3;
@Override
public Channel createConnection(OutboundConnectionIdentifier connectionId, int protocolVersion) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index b56f165..2f6deb5 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -19,11 +19,14 @@ package org.apache.cassandra.streaming;
import java.util.*;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.utils.UUIDGen;
+import static com.google.common.collect.Iterables.all;
import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
/**
@@ -69,12 +72,13 @@ public class StreamPlan
*
* @param from endpoint address to fetch data from.
* @param keyspace name of keyspace
- * @param ranges ranges to fetch
+ * @param fullRanges ranges to fetch that from provides the full version of
+ * @param transientRanges ranges to fetch that from provides only transient data of
* @return this object for chaining
*/
- public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, Collection<Range<Token>> ranges)
+ public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges)
{
- return requestRanges(from, keyspace, ranges, EMPTY_COLUMN_FAMILIES);
+ return requestRanges(from, keyspace, fullRanges, transientRanges, EMPTY_COLUMN_FAMILIES);
}
/**
@@ -82,14 +86,20 @@ public class StreamPlan
*
* @param from endpoint address to fetch data from.
* @param keyspace name of keyspace
- * @param ranges ranges to fetch
+ * @param fullRanges ranges to fetch that from provides the full data for
+ * @param transientRanges ranges to fetch that from provides only transient data for
* @param columnFamilies specific column families
* @return this object for chaining
*/
- public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
+ public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, String... columnFamilies)
{
+ //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
+ assert all(fullRanges, Replica::isLocal) || all(fullRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) :
+ fullRanges.toString();
+ assert all(transientRanges, Replica::isLocal) || all(transientRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) :
+ transientRanges.toString();
StreamSession session = coordinator.getOrCreateNextSession(from);
- session.addStreamRequest(keyspace, ranges, Arrays.asList(columnFamilies));
+ session.addStreamRequest(keyspace, fullRanges, transientRanges, Arrays.asList(columnFamilies));
return this;
}
@@ -98,14 +108,14 @@ public class StreamPlan
*
* @param to endpoint address of receiver
* @param keyspace name of keyspace
- * @param ranges ranges to send
+ * @param replicas ranges to send
* @param columnFamilies specific column families
* @return this object for chaining
*/
- public StreamPlan transferRanges(InetAddressAndPort to, String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
+ public StreamPlan transferRanges(InetAddressAndPort to, String keyspace, RangesAtEndpoint replicas, String... columnFamilies)
{
StreamSession session = coordinator.getOrCreateNextSession(to);
- session.addTransferRanges(keyspace, ranges, Arrays.asList(columnFamilies), flushBeforeTransfer);
+ session.addTransferRanges(keyspace, replicas, Arrays.asList(columnFamilies), flushBeforeTransfer);
return this;
}
@@ -182,4 +192,10 @@ public class StreamPlan
{
return flushBeforeTransfer;
}
+
+ @VisibleForTesting
+ public StreamCoordinator getCoordinator()
+ {
+ return coordinator;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/streaming/StreamRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamRequest.java b/src/java/org/apache/cassandra/streaming/StreamRequest.java
index 4a3761e..f37268f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamRequest.java
+++ b/src/java/org/apache/cassandra/streaming/StreamRequest.java
@@ -29,6 +29,10 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.net.CompactEndpointSerializationHelper;
import org.apache.cassandra.net.MessagingService;
public class StreamRequest
@@ -36,12 +40,23 @@ public class StreamRequest
public static final IVersionedSerializer<StreamRequest> serializer = new StreamRequestSerializer();
public final String keyspace;
- public final Collection<Range<Token>> ranges;
+ //Full replicas and transient replicas are split based on the transient status of the remote we are fetching
+ //from. We preserve this distinction so on completion we can log to a system table whether we got the data transiently
+ //or fully from some remote. This is an important distinction for resumable bootstrap. The Replicas in these collections
+ //are local replicas (or dummy if this is triggered by repair) and don't encode the necessary information about
+ //what the remote provided.
+ public final RangesAtEndpoint full;
+ public final RangesAtEndpoint transientReplicas;
public final Collection<String> columnFamilies = new HashSet<>();
- public StreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies)
+
+ public StreamRequest(String keyspace, RangesAtEndpoint full, RangesAtEndpoint transientReplicas, Collection<String> columnFamilies)
{
this.keyspace = keyspace;
- this.ranges = ranges;
+ if (!full.endpoint().equals(transientReplicas.endpoint()))
+ throw new IllegalStateException("Mismatching endpoints: " + full + ", " + transientReplicas);
+
+ this.full = full;
+ this.transientReplicas = transientReplicas;
this.columnFamilies.addAll(columnFamilies);
}
@@ -50,49 +65,82 @@ public class StreamRequest
public void serialize(StreamRequest request, DataOutputPlus out, int version) throws IOException
{
out.writeUTF(request.keyspace);
- out.writeInt(request.ranges.size());
- for (Range<Token> range : request.ranges)
- {
- MessagingService.validatePartitioner(range);
- Token.serializer.serialize(range.left, out, version);
- Token.serializer.serialize(range.right, out, version);
- }
out.writeInt(request.columnFamilies.size());
+
+ CompactEndpointSerializationHelper.streamingInstance.serialize(request.full.endpoint(), out, version);
+ serializeReplicas(request.full, out, version);
+ serializeReplicas(request.transientReplicas, out, version);
for (String cf : request.columnFamilies)
out.writeUTF(cf);
}
- public StreamRequest deserialize(DataInputPlus in, int version) throws IOException
+ private void serializeReplicas(RangesAtEndpoint replicas, DataOutputPlus out, int version) throws IOException
{
- String keyspace = in.readUTF();
- int rangeCount = in.readInt();
- List<Range<Token>> ranges = new ArrayList<>(rangeCount);
- for (int i = 0; i < rangeCount; i++)
+ out.writeInt(replicas.size());
+
+ for (Replica replica : replicas)
{
- Token left = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version);
- Token right = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version);
- ranges.add(new Range<>(left, right));
+ MessagingService.validatePartitioner(replica.range());
+ Token.serializer.serialize(replica.range().left, out, version);
+ Token.serializer.serialize(replica.range().right, out, version);
}
+ }
+
+ public StreamRequest deserialize(DataInputPlus in, int version) throws IOException
+ {
+ String keyspace = in.readUTF();
int cfCount = in.readInt();
+ InetAddressAndPort endpoint = CompactEndpointSerializationHelper.streamingInstance.deserialize(in, version);
+
+ RangesAtEndpoint full = deserializeReplicas(in, version, endpoint, true);
+ RangesAtEndpoint transientReplicas = deserializeReplicas(in, version, endpoint, false);
List<String> columnFamilies = new ArrayList<>(cfCount);
for (int i = 0; i < cfCount; i++)
columnFamilies.add(in.readUTF());
- return new StreamRequest(keyspace, ranges, columnFamilies);
+ return new StreamRequest(keyspace, full, transientReplicas, columnFamilies);
}
- public long serializedSize(StreamRequest request, int version)
+ RangesAtEndpoint deserializeReplicas(DataInputPlus in, int version, InetAddressAndPort endpoint, boolean isFull) throws IOException
{
- int size = TypeSizes.sizeof(request.keyspace);
- size += TypeSizes.sizeof(request.ranges.size());
- for (Range<Token> range : request.ranges)
+ int replicaCount = in.readInt();
+
+ RangesAtEndpoint.Builder replicas = RangesAtEndpoint.builder(endpoint, replicaCount);
+ for (int i = 0; i < replicaCount; i++)
{
- size += Token.serializer.serializedSize(range.left, version);
- size += Token.serializer.serializedSize(range.right, version);
+ //TODO, super need to review the usage of streaming vs not streaming endpoint serialization helper
+ //to make sure I'm not using the wrong one some of the time, like do repair messages use the
+ //streaming version?
+ Token left = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version);
+ Token right = Token.serializer.deserialize(in, MessagingService.globalPartitioner(), version);
+ replicas.add(new Replica(endpoint, new Range<>(left, right), isFull));
}
+ return replicas.build();
+ }
+
+ public long serializedSize(StreamRequest request, int version)
+ {
+ int size = TypeSizes.sizeof(request.keyspace);
size += TypeSizes.sizeof(request.columnFamilies.size());
+ size += CompactEndpointSerializationHelper.streamingInstance.serializedSize(request.full.endpoint(), version);
+ size += replicasSerializedSize(request.transientReplicas, version);
+ size += replicasSerializedSize(request.full, version);
for (String cf : request.columnFamilies)
size += TypeSizes.sizeof(cf);
return size;
}
+
+ private long replicasSerializedSize(RangesAtEndpoint replicas, int version)
+ {
+ long size = 0;
+ size += TypeSizes.sizeof(replicas.size());
+
+ for (Replica replica : replicas)
+ {
+ size += Token.serializer.serializedSize(replica.range().left, version);
+ size += Token.serializer.serializedSize(replica.range().right, version);
+ }
+ return size;
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 393cd24..ec80772 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -29,6 +29,7 @@ import com.google.common.util.concurrent.Futures;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.RangesAtEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
@@ -49,6 +51,8 @@ import org.apache.cassandra.streaming.messages.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import static com.google.common.collect.Iterables.all;
+
/**
* Handles the streaming a one or more streams to and from a specific remote node.
*
@@ -243,7 +247,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
public StreamReceiver getAggregator(TableId tableId)
{
- assert receivers.containsKey(tableId);
+ assert receivers.containsKey(tableId) : "Missing tableId " + tableId;
return receivers.get(tableId).getReceiver();
}
@@ -297,38 +301,52 @@ public class StreamSession implements IEndpointStateChangeSubscriber
* Request data fetch task to this session.
*
* @param keyspace Requesting keyspace
- * @param ranges Ranges to retrieve data
+ * @param fullRanges Ranges to retrieve data that will return full data from the source
+ * @param transientRanges Ranges to retrieve data that will return transient data from the source
* @param columnFamilies ColumnFamily names. Can be empty if requesting all CF under the keyspace.
*/
- public void addStreamRequest(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies)
+ public void addStreamRequest(String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, Collection<String> columnFamilies)
{
- requests.add(new StreamRequest(keyspace, ranges, columnFamilies));
+ //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node
+ assert all(fullRanges, Replica::isLocal) || all(fullRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : fullRanges.toString();
+ assert all(transientRanges, Replica::isLocal) || all(transientRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : transientRanges.toString();
+ requests.add(new StreamRequest(keyspace, fullRanges, transientRanges, columnFamilies));
}
/**
* Set up transfer for specific keyspace/ranges/CFs
*
* @param keyspace Transfer keyspace
- * @param ranges Transfer ranges
+ * @param replicas Transfer ranges
* @param columnFamilies Transfer ColumnFamilies
* @param flushTables flush tables?
*/
- synchronized void addTransferRanges(String keyspace, Collection<Range<Token>> ranges, Collection<String> columnFamilies, boolean flushTables)
+ synchronized void addTransferRanges(String keyspace, RangesAtEndpoint replicas, Collection<String> columnFamilies, boolean flushTables)
{
failIfFinished();
Collection<ColumnFamilyStore> stores = getColumnFamilyStores(keyspace, columnFamilies);
if (flushTables)
flushSSTables(stores);
- List<Range<Token>> normalizedRanges = Range.normalize(ranges);
- List<OutgoingStream> streams = getOutgoingStreamsForRanges(normalizedRanges, stores, pendingRepair, previewKind);
+ //Was it safe to remove this normalize, sorting seems not to matter, merging? Maybe we should have?
+ //Do we need to unwrap here also or is that just making it worse?
+ //Range and if it's transient
+ RangesAtEndpoint.Builder unwrappedRanges = RangesAtEndpoint.builder(replicas.endpoint(), replicas.size());
+ for (Replica replica : replicas)
+ {
+ for (Range<Token> unwrapped : replica.range().unwrap())
+ {
+ unwrappedRanges.add(new Replica(replica.endpoint(), unwrapped, replica.isFull()));
+ }
+ }
+ List<OutgoingStream> streams = getOutgoingStreamsForRanges(unwrappedRanges.build(), stores, pendingRepair, previewKind);
addTransferStreams(streams);
Set<Range<Token>> toBeUpdated = transferredRangesPerKeyspace.get(keyspace);
if (toBeUpdated == null)
{
toBeUpdated = new HashSet<>();
}
- toBeUpdated.addAll(ranges);
+ toBeUpdated.addAll(replicas.ranges());
transferredRangesPerKeyspace.put(keyspace, toBeUpdated);
}
@@ -355,14 +373,14 @@ public class StreamSession implements IEndpointStateChangeSubscriber
}
@VisibleForTesting
- public List<OutgoingStream> getOutgoingStreamsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, UUID pendingRepair, PreviewKind previewKind)
+ public List<OutgoingStream> getOutgoingStreamsForRanges(RangesAtEndpoint replicas, Collection<ColumnFamilyStore> stores, UUID pendingRepair, PreviewKind previewKind)
{
List<OutgoingStream> streams = new ArrayList<>();
try
{
for (ColumnFamilyStore cfs: stores)
{
- streams.addAll(cfs.getStreamManager().createOutgoingStreams(this, ranges, pendingRepair, previewKind));
+ streams.addAll(cfs.getStreamManager().createOutgoingStreams(this, replicas, pendingRepair, previewKind));
}
}
catch (Throwable t)
@@ -561,7 +579,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
{
for (StreamRequest request : requests)
- addTransferRanges(request.keyspace, request.ranges, request.columnFamilies, true); // always flush on stream request
+ addTransferRanges(request.keyspace, RangesAtEndpoint.concat(request.full, request.transientReplicas), request.columnFamilies, true); // always flush on stream request
for (StreamSummary summary : summaries)
prepareReceiving(summary);
@@ -812,4 +830,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
}
maybeCompleted();
}
+
+ @VisibleForTesting
+ public int getNumRequests()
+ {
+ return requests.size();
+ }
+
+ @VisibleForTesting
+ public int getNumTransfers()
+ {
+ return transferredRangesPerKeyspace.size();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/streaming/TableStreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/TableStreamManager.java b/src/java/org/apache/cassandra/streaming/TableStreamManager.java
index 11512e9..d97fabc 100644
--- a/src/java/org/apache/cassandra/streaming/TableStreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/TableStreamManager.java
@@ -21,8 +21,7 @@ package org.apache.cassandra.streaming;
import java.util.Collection;
import java.util.UUID;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.streaming.messages.StreamMessageHeader;
/**
@@ -46,12 +45,12 @@ public interface TableStreamManager
/**
* Returns a collection of {@link OutgoingStream}s that contains the data selected by the
- * given ranges, pendingRepair, and preview.
+ * given replicas, pendingRepair, and preview.
*
* There aren't any requirements on how data is divided between the outgoing streams
*/
Collection<OutgoingStream> createOutgoingStreams(StreamSession session,
- Collection<Range<Token>> ranges,
+ RangesAtEndpoint replicas,
UUID pendingRepair,
PreviewKind previewKind);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 03b8af0..54187d1 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -816,6 +816,11 @@ public class NodeProbe implements AutoCloseable
return ssProxy.getNaturalEndpoints(keyspace, cf, key);
}
+ public List<String> getReplicas(String keyspace, String cf, String key)
+ {
+ return ssProxy.getReplicas(keyspace, cf, key);
+ }
+
public List<String> getSSTables(String keyspace, String cf, String key, boolean hexFormat)
{
ColumnFamilyStoreMBean cfsProxy = getCfsProxy(keyspace, cf);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index c2193d4..1d09b0f 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -190,6 +190,8 @@ public class NodeTool
ReloadSslCertificates.class,
EnableAuditLog.class,
DisableAuditLog.class,
+ GetReplicas.class,
+ DisableAuditLog.class,
EnableOldProtocolVersions.class,
DisableOldProtocolVersions.class
);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
index 8056ff8..31d80fa 100644
--- a/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
+++ b/src/java/org/apache/cassandra/tools/SSTableRepairedAtSetter.java
@@ -88,11 +88,11 @@ public class SSTableRepairedAtSetter
if (setIsRepaired)
{
FileTime f = Files.getLastModifiedTime(new File(descriptor.filenameFor(Component.DATA)).toPath());
- descriptor.getMetadataSerializer().mutateRepaired(descriptor, f.toMillis(), null);
+ descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, f.toMillis(), null, false);
}
else
{
- descriptor.getMetadataSerializer().mutateRepaired(descriptor, 0, null);
+ descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, 0, null, false);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/tools/nodetool/GetReplicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetReplicas.java b/src/java/org/apache/cassandra/tools/nodetool/GetReplicas.java
new file mode 100644
index 0000000..4c401fc
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/GetReplicas.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Command(name = "getreplicas", description = "Print replicas for a given key")
+public class GetReplicas extends NodeTool.NodeToolCmd
+{
+ @Arguments(usage = "<keyspace> <table> <key>", description = "The keyspace, the table, and the partition key for which we need to find replicas")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 3, "getreplicas requires keyspace, table and partition key arguments");
+ String ks = args.get(0);
+ String table = args.get(1);
+ String key = args.get(2);
+
+ System.out.println(probe.getReplicas(ks, table, key));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index a53846c..1e0813c 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -161,7 +161,7 @@ public abstract class TraceState implements ProgressEventNotifier
trace(MessageFormatter.format(format, arg1, arg2).getMessage());
}
- public void trace(String format, Object[] args)
+ public void trace(String format, Object... args)
{
trace(MessageFormatter.arrayFormat(format, args).getMessage());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 84af41c..8e0b19f 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -68,7 +68,7 @@ public class ErrorMessage extends Message.Response
ConsistencyLevel cl = CBUtil.readConsistencyLevel(body);
int required = body.readInt();
int alive = body.readInt();
- te = new UnavailableException(cl, required, alive);
+ te = UnavailableException.create(cl, required, alive);
}
break;
case OVERLOADED:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/utils/Pair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Pair.java b/src/java/org/apache/cassandra/utils/Pair.java
index ea8b8fc..cb09529 100644
--- a/src/java/org/apache/cassandra/utils/Pair.java
+++ b/src/java/org/apache/cassandra/utils/Pair.java
@@ -53,6 +53,18 @@ public class Pair<T1, T2>
return "(" + left + "," + right + ")";
}
+ //For functional interfaces
+ public T1 left()
+ {
+ return left;
+ }
+
+ //For functional interfaces
+ public T2 right()
+ {
+ return right;
+ }
+
public static <X, Y> Pair<X, Y> create(X x, Y y)
{
return new Pair<X, Y>(x, y);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
index e80faca..0c097a6 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
@@ -18,6 +18,8 @@
*/
package org.apache.cassandra.utils.concurrent;
+import java.util.AbstractCollection;
+import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -27,7 +29,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
*
* @param <E>
*/
-public class Accumulator<E> implements Iterable<E>
+public class Accumulator<E>
{
private volatile int nextIndex;
private volatile int presentCount;
@@ -105,7 +107,7 @@ public class Accumulator<E> implements Iterable<E>
return values.length;
}
- public Iterator<E> iterator()
+ private Iterator<E> iterator(int count)
{
return new Iterator<E>()
{
@@ -113,7 +115,7 @@ public class Accumulator<E> implements Iterable<E>
public boolean hasNext()
{
- return p < presentCount;
+ return p < count;
}
public E next()
@@ -135,4 +137,23 @@ public class Accumulator<E> implements Iterable<E>
throw new IndexOutOfBoundsException();
return (E) values[i];
}
+
+ public Collection<E> snapshot()
+ {
+ int count = presentCount;
+ return new AbstractCollection<E>()
+ {
+ @Override
+ public Iterator<E> iterator()
+ {
+ return Accumulator.this.iterator(count);
+ }
+
+ @Override
+ public int size()
+ {
+ return count;
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-CompressionInfo.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-CompressionInfo.db
index ceaa5a3..8fad34f 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-CompressionInfo.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-CompressionInfo.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Data.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Data.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Data.db
index 6968720..ae35335 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Data.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Data.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Digest.crc32
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Digest.crc32 b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Digest.crc32
index f1c192b..8a92f3c 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Digest.crc32
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Digest.crc32
@@ -1 +1 @@
-4004129384
\ No newline at end of file
+2977407251
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Index.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Index.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Index.db
index af16195..d50fdeb 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Index.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Index.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Statistics.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Statistics.db
index 970e385..7341864 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Statistics.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-Statistics.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-TOC.txt b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-TOC.txt
index bb800f8..b03b283 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-TOC.txt
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust/na-1-big-TOC.txt
@@ -1,8 +1,8 @@
-Digest.crc32
Filter.db
-CompressionInfo.db
+Digest.crc32
Index.db
-Summary.db
-Data.db
TOC.txt
+Summary.db
Statistics.db
+CompressionInfo.db
+Data.db
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-CompressionInfo.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-CompressionInfo.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-CompressionInfo.db
index f5ad4d0..f0a1cfb 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-CompressionInfo.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-CompressionInfo.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Data.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Data.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Data.db
index 7217716..b487fe8 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Data.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Data.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Digest.crc32
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Digest.crc32 b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Digest.crc32
index 4f1391a..ca286e0 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Digest.crc32
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Digest.crc32
@@ -1 +1 @@
-4072239034
\ No newline at end of file
+2759187708
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Index.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Index.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Index.db
index 6dd3da6..c981a22 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Index.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Index.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Statistics.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Statistics.db
index 3a0e63f..33fccc9 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Statistics.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-Statistics.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-TOC.txt b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-TOC.txt
index bb800f8..b03b283 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-TOC.txt
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_clust_counter/na-1-big-TOC.txt
@@ -1,8 +1,8 @@
-Digest.crc32
Filter.db
-CompressionInfo.db
+Digest.crc32
Index.db
-Summary.db
-Data.db
TOC.txt
+Summary.db
Statistics.db
+CompressionInfo.db
+Data.db
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Data.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Data.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Data.db
index c665dfb..11219d0 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Data.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Data.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Digest.crc32
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Digest.crc32 b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Digest.crc32
index c6c24a7..985d6dc 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Digest.crc32
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Digest.crc32
@@ -1 +1 @@
-3772296151
\ No newline at end of file
+462858821
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Statistics.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Statistics.db
index 6741430..3c68ac5 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Statistics.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-Statistics.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-TOC.txt b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-TOC.txt
index bb800f8..b03b283 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-TOC.txt
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple/na-1-big-TOC.txt
@@ -1,8 +1,8 @@
-Digest.crc32
Filter.db
-CompressionInfo.db
+Digest.crc32
Index.db
-Summary.db
-Data.db
TOC.txt
+Summary.db
Statistics.db
+CompressionInfo.db
+Data.db
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Data.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Data.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Data.db
index d9fe576..620cdf2 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Data.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Data.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Digest.crc32
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Digest.crc32 b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Digest.crc32
index de7baed..bc5f671 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Digest.crc32
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Digest.crc32
@@ -1 +1 @@
-4035692752
\ No newline at end of file
+3987542254
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Statistics.db b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Statistics.db
index e9556d1..689bec8 100644
Binary files a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Statistics.db and b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-Statistics.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-TOC.txt b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-TOC.txt
index bb800f8..b03b283 100644
--- a/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-TOC.txt
+++ b/test/data/legacy-sstables/na/legacy_tables/legacy_na_simple_counter/na-1-big-TOC.txt
@@ -1,8 +1,8 @@
-Digest.crc32
Filter.db
-CompressionInfo.db
+Digest.crc32
Index.db
-Summary.db
-Data.db
TOC.txt
+Summary.db
Statistics.db
+CompressionInfo.db
+Data.db
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java b/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
index a5025a3..94a3bd3 100644
--- a/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
+++ b/test/long/org/apache/cassandra/locator/DynamicEndpointSnitchLongTest.java
@@ -54,19 +54,21 @@ public class DynamicEndpointSnitchLongTest
DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode()));
InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
- List<InetAddressAndPort> hosts = new ArrayList<>();
+ EndpointsForRange.Builder replicasBuilder = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE);
// We want a big list of hosts so sorting takes time, making it much more likely to reproduce the
// problem we're looking for.
for (int i = 0; i < 100; i++)
for (int j = 0; j < 256; j++)
- hosts.add(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, (byte)i, (byte)j}));
+ replicasBuilder.add(ReplicaUtils.full(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, (byte)i, (byte)j})));
- ScoreUpdater updater = new ScoreUpdater(dsnitch, hosts);
+ EndpointsForRange replicas = replicasBuilder.build();
+
+ ScoreUpdater updater = new ScoreUpdater(dsnitch, replicas);
updater.start();
- List<InetAddressAndPort> result = null;
+ EndpointsForRange result = replicas;
for (int i = 0; i < ITERATIONS; i++)
- result = dsnitch.getSortedListByProximity(self, hosts);
+ result = dsnitch.sortedByProximity(self, result);
updater.stopped = true;
updater.join();
@@ -84,10 +86,10 @@ public class DynamicEndpointSnitchLongTest
public volatile boolean stopped;
private final DynamicEndpointSnitch dsnitch;
- private final List<InetAddressAndPort> hosts;
+ private final EndpointsForRange hosts;
private final Random random = new Random();
- public ScoreUpdater(DynamicEndpointSnitch dsnitch, List<InetAddressAndPort> hosts)
+ public ScoreUpdater(DynamicEndpointSnitch dsnitch, EndpointsForRange hosts)
{
this.dsnitch = dsnitch;
this.hosts = hosts;
@@ -97,9 +99,9 @@ public class DynamicEndpointSnitchLongTest
{
while (!stopped)
{
- InetAddressAndPort host = hosts.get(random.nextInt(hosts.size()));
+ Replica host = hosts.get(random.nextInt(hosts.size()));
int score = random.nextInt(SCORE_RANGE);
- dsnitch.receiveTiming(host, score);
+ dsnitch.receiveTiming(host.endpoint(), score);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
index 01e67f0..e37045a 100644
--- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
+++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
@@ -33,11 +33,10 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadataRef;
@@ -121,8 +120,8 @@ public class LongStreamingTest
private String ks;
public void init(String keyspace)
{
- for (Range<Token> range : StorageService.instance.getLocalRanges(KS))
- addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort());
+ for (Replica range : StorageService.instance.getLocalReplicas(KS))
+ addRangeForEndpoint(range.range(), FBUtilities.getBroadcastAddressAndPort());
this.ks = keyspace;
}
@@ -148,8 +147,8 @@ public class LongStreamingTest
private String ks;
public void init(String keyspace)
{
- for (Range<Token> range : StorageService.instance.getLocalRanges(KS))
- addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort());
+ for (Replica range : StorageService.instance.getLocalReplicas(KS))
+ addRangeForEndpoint(range.range(), FBUtilities.getBroadcastAddressAndPort());
this.ks = keyspace;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
index 68cfd7e..73a2b71 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java
@@ -21,18 +21,22 @@
package org.apache.cassandra.test.microbench;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.PendingRangeMaps;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaUtils;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
@@ -50,7 +54,7 @@ public class PendingRangesBench
PendingRangeMaps pendingRangeMaps;
int maxToken = 256 * 100;
- Multimap<Range<Token>, InetAddressAndPort> oldPendingRanges;
+ Multimap<Range<Token>, Replica> oldPendingRanges;
private Range<Token> genRange(String left, String right)
{
@@ -63,15 +67,17 @@ public class PendingRangesBench
pendingRangeMaps = new PendingRangeMaps();
oldPendingRanges = HashMultimap.create();
- InetAddressAndPort[] addresses = { InetAddressAndPort.getByName("127.0.0.1"), InetAddressAndPort.getByName("127.0.0.2")};
+ List<InetAddressAndPort> endpoints = Lists.newArrayList(InetAddressAndPort.getByName("127.0.0.1"),
+ InetAddressAndPort.getByName("127.0.0.2"));
for (int i = 0; i < maxToken; i++)
{
for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++)
{
Range<Token> range = genRange(Integer.toString(i * 10 + 5), Integer.toString(i * 10 + 15));
- pendingRangeMaps.addPendingRange(range, addresses[j]);
- oldPendingRanges.put(range, addresses[j]);
+ Replica replica = Replica.fullReplica(endpoints.get(j), range);
+ pendingRangeMaps.addPendingRange(range, replica);
+ oldPendingRanges.put(range, replica);
}
}
@@ -79,8 +85,9 @@ public class PendingRangesBench
for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++)
{
Range<Token> range = genRange(Integer.toString(maxToken * 10 + 5), Integer.toString(5));
- pendingRangeMaps.addPendingRange(range, addresses[j]);
- oldPendingRanges.put(range, addresses[j]);
+ Replica replica = Replica.fullReplica(endpoints.get(j), range);
+ pendingRangeMaps.addPendingRange(range, replica);
+ oldPendingRanges.put(range, replica);
}
}
@@ -97,13 +104,13 @@ public class PendingRangesBench
{
int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5);
Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken));
- Set<InetAddressAndPort> endpoints = new HashSet<>();
- for (Map.Entry<Range<Token>, Collection<InetAddressAndPort>> entry : oldPendingRanges.asMap().entrySet())
+ Set<Replica> replicas = new HashSet<>();
+ for (Map.Entry<Range<Token>, Collection<Replica>> entry : oldPendingRanges.asMap().entrySet())
{
if (entry.getKey().contains(searchToken))
- endpoints.addAll(entry.getValue());
+ replicas.addAll(entry.getValue());
}
- bh.consume(endpoints);
+ bh.consume(replicas);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 1201efa..bc2c19c 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -32,9 +32,11 @@ import java.util.function.Supplier;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +75,7 @@ import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class Util
@@ -446,6 +449,14 @@ public class Util
}
}
+ public static void consume(UnfilteredPartitionIterator iterator)
+ {
+ while (iterator.hasNext())
+ {
+ consume(iterator.next());
+ }
+ }
+
public static int size(PartitionIterator iter)
{
int size = 0;
@@ -478,6 +489,15 @@ public class Util
&& Iterators.elementsEqual(a, b);
}
+ public static boolean sameContent(RowIterator a, RowIterator b)
+ {
+ return Objects.equals(a.metadata(), b.metadata())
+ && Objects.equals(a.isReverseOrder(), b.isReverseOrder())
+ && Objects.equals(a.partitionKey(), b.partitionKey())
+ && Objects.equals(a.staticRow(), b.staticRow())
+ && Iterators.elementsEqual(a, b);
+ }
+
public static boolean sameContent(Mutation a, Mutation b)
{
if (!a.key().equals(b.key()) || !a.getTableIds().equals(b.getTableIds()))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 07ab3dc..782e3b1 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -104,6 +104,7 @@ public class DatabaseDescriptorRefTest
"org.apache.cassandra.io.util.DataOutputPlus",
"org.apache.cassandra.io.util.DiskOptimizationStrategy",
"org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy",
+ "org.apache.cassandra.locator.Replica",
"org.apache.cassandra.locator.SimpleSeedProvider",
"org.apache.cassandra.locator.SeedProvider",
"org.apache.cassandra.net.BackPressureStrategy",
@@ -134,7 +135,9 @@ public class DatabaseDescriptorRefTest
"org.apache.cassandra.config.EncryptionOptions$ServerEncryptionOptionsCustomizer",
"org.apache.cassandra.ConsoleAppenderBeanInfo",
"org.apache.cassandra.ConsoleAppenderCustomizer",
- "org.apache.cassandra.locator.InetAddressAndPort"
+ "org.apache.cassandra.locator.InetAddressAndPort",
+ "org.apache.cassandra.cql3.statements.schema.AlterKeyspaceStatement",
+ "org.apache.cassandra.cql3.statements.schema.CreateKeyspaceStatement"
};
static final Set<String> checkedClasses = new HashSet<>(Arrays.asList(validClasses));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 4a1a365..2fbbc28 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.index.SecondaryIndexManager;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.functions.FunctionName;
@@ -148,7 +149,7 @@ public abstract class CQLTester
{
@Override public String getRack(InetAddressAndPort endpoint) { return RACK1; }
@Override public String getDatacenter(InetAddressAndPort endpoint) { return DATA_CENTER; }
- @Override public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; }
+ @Override public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) { return 0; }
});
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
index 184c5ad..37605d6 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.AbstractEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.SchemaKeyspace;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.schema.*;
@@ -527,7 +528,7 @@ public class CreateTest extends CQLTester
public String getDatacenter(InetAddressAndPort endpoint) { return "us-east-1"; }
@Override
- public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) { return 0; }
+ public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) { return 0; }
});
// this forces the dc above to be added to the list of known datacenters (fixes static init problem
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org