You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:36:06 UTC
[14/18] cassandra git commit: Transient Replication and Cheap Quorums
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index db73b4f..8eb8603 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -21,6 +21,9 @@ import java.util.Collection;
import java.util.Set;
import java.util.UUID;
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.db.RowIndexEntry;
@@ -85,13 +88,15 @@ public class BigFormat implements SSTableFormat
long keyCount,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
TableMetadataRef metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
Collection<SSTableFlushObserver> observers,
LifecycleTransaction txn)
{
- return new BigTableWriter(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers, txn);
+ SSTable.validateRepairedMetadata(repairedAt, pendingRepair, isTransient);
+ return new BigTableWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers, txn);
}
}
@@ -120,7 +125,7 @@ public class BigFormat implements SSTableFormat
// mb (3.0.7, 3.7): commit log lower bound included
// mc (3.0.8, 3.9): commit log intervals included
- // na (4.0.0): uncompressed chunks, pending repair session, checksummed sstable metadata file, new Bloomfilter format
+ // na (4.0.0): uncompressed chunks, pending repair session, isTransient, checksummed sstable metadata file, new Bloomfilter format
//
// NOTE: when adding a new version, please add that to LegacySSTableTest, too.
@@ -131,6 +136,7 @@ public class BigFormat implements SSTableFormat
public final boolean hasMaxCompressedLength;
private final boolean hasPendingRepair;
private final boolean hasMetadataChecksum;
+ private final boolean hasIsTransient;
/**
* CASSANDRA-9067: 4.0 bloom filter representation changed (two longs just swapped)
* have no 'static' bits caused by using the same upper bits for both bloom filter and token distribution.
@@ -148,6 +154,7 @@ public class BigFormat implements SSTableFormat
hasCommitLogIntervals = version.compareTo("mc") >= 0;
hasMaxCompressedLength = version.compareTo("na") >= 0;
hasPendingRepair = version.compareTo("na") >= 0;
+ hasIsTransient = version.compareTo("na") >= 0;
hasMetadataChecksum = version.compareTo("na") >= 0;
hasOldBfFormat = version.compareTo("na") < 0;
}
@@ -176,6 +183,12 @@ public class BigFormat implements SSTableFormat
}
@Override
+ public boolean hasIsTransient()
+ {
+ return hasIsTransient;
+ }
+
+ @Override
public int correspondingMessagingVersion()
{
return correspondingMessagingVersion;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index b5488ed..7513e95 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -68,13 +68,14 @@ public class BigTableWriter extends SSTableWriter
long keyCount,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
TableMetadataRef metadata,
MetadataCollector metadataCollector,
SerializationHeader header,
Collection<SSTableFlushObserver> observers,
LifecycleTransaction txn)
{
- super(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers);
+ super(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers);
txn.trackNew(this); // must track before any files are created
if (compression)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
index 6a40d94..eb7b2c7 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -71,7 +71,7 @@ public interface IMetadataSerializer
void mutateLevel(Descriptor descriptor, int newLevel) throws IOException;
/**
- * Mutate the repairedAt time and pendingRepair ID
+ * Mutate the repairedAt time, pendingRepair ID, and transient status
*/
- void mutateRepaired(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair) throws IOException;
+ public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 9d9c1a8..36c218b 100755
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -83,7 +83,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
ActiveRepairService.UNREPAIRED_SSTABLE,
-1,
-1,
- null);
+ null,
+ false);
}
protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram();
@@ -272,7 +273,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards;
}
- public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, UUID pendingRepair, SerializationHeader header)
+ public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, UUID pendingRepair, boolean isTransient, SerializationHeader header)
{
Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class);
components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
@@ -294,7 +295,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
repairedAt,
totalColumnsSet,
totalRows,
- pendingRepair));
+ pendingRepair,
+ isTransient));
components.put(MetadataType.COMPACTION, new CompactionMetadata(cardinality));
components.put(MetadataType.HEADER, header.toComponent());
return components;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
index 74923a0..f76db2d 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java
@@ -230,7 +230,7 @@ public class MetadataSerializer implements IMetadataSerializer
rewriteSSTableMetadata(descriptor, currentComponents);
}
- public void mutateRepaired(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair) throws IOException
+ public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException
{
if (logger.isTraceEnabled())
logger.trace("Mutating {} to repairedAt time {} and pendingRepair {}",
@@ -238,7 +238,7 @@ public class MetadataSerializer implements IMetadataSerializer
Map<MetadataType, MetadataComponent> currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class));
StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS);
// mutate time & id
- currentComponents.put(MetadataType.STATS, stats.mutateRepairedAt(newRepairedAt).mutatePendingRepair(newPendingRepair));
+ currentComponents.put(MetadataType.STATS, stats.mutateRepairedMetadata(newRepairedAt, newPendingRepair, isTransient));
rewriteSSTableMetadata(descriptor, currentComponents);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 2b8ebef..f14fb5d 100755
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -64,6 +64,7 @@ public class StatsMetadata extends MetadataComponent
public final long totalColumnsSet;
public final long totalRows;
public final UUID pendingRepair;
+ public final boolean isTransient;
public StatsMetadata(EstimatedHistogram estimatedPartitionSize,
EstimatedHistogram estimatedColumnCount,
@@ -83,7 +84,8 @@ public class StatsMetadata extends MetadataComponent
long repairedAt,
long totalColumnsSet,
long totalRows,
- UUID pendingRepair)
+ UUID pendingRepair,
+ boolean isTransient)
{
this.estimatedPartitionSize = estimatedPartitionSize;
this.estimatedColumnCount = estimatedColumnCount;
@@ -104,6 +106,7 @@ public class StatsMetadata extends MetadataComponent
this.totalColumnsSet = totalColumnsSet;
this.totalRows = totalRows;
this.pendingRepair = pendingRepair;
+ this.isTransient = isTransient;
}
public MetadataType getType()
@@ -155,10 +158,11 @@ public class StatsMetadata extends MetadataComponent
repairedAt,
totalColumnsSet,
totalRows,
- pendingRepair);
+ pendingRepair,
+ isTransient);
}
- public StatsMetadata mutateRepairedAt(long newRepairedAt)
+ public StatsMetadata mutateRepairedMetadata(long newRepairedAt, UUID newPendingRepair, boolean newIsTransient)
{
return new StatsMetadata(estimatedPartitionSize,
estimatedColumnCount,
@@ -178,30 +182,8 @@ public class StatsMetadata extends MetadataComponent
newRepairedAt,
totalColumnsSet,
totalRows,
- pendingRepair);
- }
-
- public StatsMetadata mutatePendingRepair(UUID newPendingRepair)
- {
- return new StatsMetadata(estimatedPartitionSize,
- estimatedColumnCount,
- commitLogIntervals,
- minTimestamp,
- maxTimestamp,
- minLocalDeletionTime,
- maxLocalDeletionTime,
- minTTL,
- maxTTL,
- compressionRatio,
- estimatedTombstoneDropTime,
- sstableLevel,
- minClusteringValues,
- maxClusteringValues,
- hasLegacyCounterShards,
- repairedAt,
- totalColumnsSet,
- totalRows,
- newPendingRepair);
+ newPendingRepair,
+ newIsTransient);
}
@Override
@@ -292,6 +274,12 @@ public class StatsMetadata extends MetadataComponent
if (component.pendingRepair != null)
size += UUIDSerializer.serializer.serializedSize(component.pendingRepair, 0);
}
+
+ if (version.hasIsTransient())
+ {
+ size += TypeSizes.sizeof(component.isTransient);
+ }
+
return size;
}
@@ -338,6 +326,11 @@ public class StatsMetadata extends MetadataComponent
out.writeByte(0);
}
}
+
+ if (version.hasIsTransient())
+ {
+ out.writeBoolean(component.isTransient);
+ }
}
public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException
@@ -386,6 +379,8 @@ public class StatsMetadata extends MetadataComponent
pendingRepair = UUIDSerializer.serializer.deserialize(in, 0);
}
+ boolean isTransient = version.hasIsTransient() && in.readBoolean();
+
return new StatsMetadata(partitionSizes,
columnCounts,
commitLogIntervals,
@@ -404,7 +399,8 @@ public class StatsMetadata extends MetadataComponent
repairedAt,
totalColumnsSet,
totalRows,
- pendingRepair);
+ pendingRepair,
+ isTransient);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
index 2ee8eea..2e7408b 100644
--- a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
@@ -17,13 +17,12 @@
*/
package org.apache.cassandra.locator;
-import java.util.*;
-
+import com.google.common.collect.Iterables;
import org.apache.cassandra.config.DatabaseDescriptor;
public abstract class AbstractEndpointSnitch implements IEndpointSnitch
{
- public abstract int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2);
+ public abstract int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2);
/**
* Sorts the <tt>Collection</tt> of node addresses by proximity to the given address
@@ -31,27 +30,9 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch
* @param unsortedAddress the nodes to sort
* @return a new sorted <tt>List</tt>
*/
- public List<InetAddressAndPort> getSortedListByProximity(InetAddressAndPort address, Collection<InetAddressAndPort> unsortedAddress)
- {
- List<InetAddressAndPort> preferred = new ArrayList<>(unsortedAddress);
- sortByProximity(address, preferred);
- return preferred;
- }
-
- /**
- * Sorts the <tt>List</tt> of node addresses, in-place, by proximity to the given address
- * @param address the address to sort the proximity by
- * @param addresses the nodes to sort
- */
- public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
+ public <C extends ReplicaCollection<? extends C>> C sortedByProximity(final InetAddressAndPort address, C unsortedAddress)
{
- Collections.sort(addresses, new Comparator<InetAddressAndPort>()
- {
- public int compare(InetAddressAndPort a1, InetAddressAndPort a2)
- {
- return compareEndpoints(address, a1, a2);
- }
- });
+ return unsortedAddress.sorted((r1, r2) -> compareEndpoints(address, r1, r2));
}
public void gossiperStarting()
@@ -59,7 +40,7 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch
// noop by default
}
- public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2)
+ public boolean isWorthMergingForRangeQuery(ReplicaCollection<?> merged, ReplicaCollection<?> l1, ReplicaCollection<?> l2)
{
// Querying remote DC is likely to be an order of magnitude slower than
// querying locally, so 2 queries to local nodes is likely to still be
@@ -70,14 +51,9 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch
: true;
}
- private boolean hasRemoteNode(List<InetAddressAndPort> l)
+ private boolean hasRemoteNode(ReplicaCollection<?> l)
{
String localDc = DatabaseDescriptor.getLocalDataCenter();
- for (InetAddressAndPort ep : l)
- {
- if (!localDc.equals(getDatacenter(ep)))
- return true;
- }
- return false;
+ return Iterables.any(l, replica -> !localDc.equals(getDatacenter(replica)));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java b/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
index e91f6ac..08c41f0 100644
--- a/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
+++ b/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java
@@ -37,8 +37,11 @@ public abstract class AbstractNetworkTopologySnitch extends AbstractEndpointSnit
*/
abstract public String getDatacenter(InetAddressAndPort endpoint);
- public int compareEndpoints(InetAddressAndPort address, InetAddressAndPort a1, InetAddressAndPort a2)
+ @Override
+ public int compareEndpoints(InetAddressAndPort address, Replica r1, Replica r2)
{
+ InetAddressAndPort a1 = r1.endpoint();
+ InetAddressAndPort a2 = r2.endpoint();
if (address.equals(a1) && !address.equals(a2))
return -1;
if (address.equals(a2) && !address.equals(a1))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
new file mode 100644
index 0000000..ecf1296
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.collect.Iterables;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Stream;
+
+/**
+ * A collection like class for Replica objects. Since the Replica class contains inetaddress, range, and
+ * transient replication status, basic contains and remove methods can be ambiguous. Replicas forces you
+ * to be explicit about what you're checking the container for, or removing from it.
+ */
+public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollection<C>> implements ReplicaCollection<C>
+{
+ protected static final List<Replica> EMPTY_LIST = new ArrayList<>(); // since immutable, can safely return this to avoid megamorphic callsites
+
+ public static <C extends ReplicaCollection<C>, B extends Builder<C, ?, B>> Collector<Replica, B, C> collector(Set<Collector.Characteristics> characteristics, Supplier<B> supplier)
+ {
+ return new Collector<Replica, B, C>()
+ {
+ private final BiConsumer<B, Replica> accumulator = Builder::add;
+ private final BinaryOperator<B> combiner = (a, b) -> { a.addAll(b.mutable); return a; };
+ private final Function<B, C> finisher = Builder::build;
+ public Supplier<B> supplier() { return supplier; }
+ public BiConsumer<B, Replica> accumulator() { return accumulator; }
+ public BinaryOperator<B> combiner() { return combiner; }
+ public Function<B, C> finisher() { return finisher; }
+ public Set<Characteristics> characteristics() { return characteristics; }
+ };
+ }
+
+ protected final List<Replica> list;
+ protected final boolean isSnapshot;
+ protected AbstractReplicaCollection(List<Replica> list, boolean isSnapshot)
+ {
+ this.list = list;
+ this.isSnapshot = isSnapshot;
+ }
+
+ // if subList == null, should return self (or a clone thereof)
+ protected abstract C snapshot(List<Replica> subList);
+ protected abstract C self();
+ /**
+ * construct a new Mutable of our own type, so that we can concatenate
+ * TODO: this isn't terribly pretty, but we need sometimes to select / merge two Endpoints of unknown type;
+ */
+ public abstract Mutable<C> newMutable(int initialCapacity);
+
+
+ public C snapshot()
+ {
+ return isSnapshot ? self()
+ : snapshot(list.isEmpty() ? EMPTY_LIST
+ : new ArrayList<>(list));
+ }
+
+ public final C subList(int start, int end)
+ {
+ List<Replica> subList;
+ if (isSnapshot)
+ {
+ if (start == 0 && end == size()) return self();
+ else if (start == end) subList = EMPTY_LIST;
+ else subList = list.subList(start, end);
+ }
+ else
+ {
+ if (start == end) subList = EMPTY_LIST;
+ else subList = new ArrayList<>(list.subList(start, end)); // TODO: we could take a subList here, but comodification checks stop us
+ }
+ return snapshot(subList);
+ }
+
+ public final C filter(Predicate<Replica> predicate)
+ {
+ return filter(predicate, Integer.MAX_VALUE);
+ }
+
+ public final C filter(Predicate<Replica> predicate, int limit)
+ {
+ if (isEmpty())
+ return snapshot();
+
+ List<Replica> copy = null;
+ int beginRun = -1, endRun = -1;
+ int i = 0;
+ for (; i < list.size() ; ++i)
+ {
+ Replica replica = list.get(i);
+ if (predicate.test(replica))
+ {
+ if (copy != null)
+ copy.add(replica);
+ else if (beginRun < 0)
+ beginRun = i;
+ else if (endRun > 0)
+ {
+ copy = new ArrayList<>(Math.min(limit, (list.size() - i) + (endRun - beginRun)));
+ for (int j = beginRun ; j < endRun ; ++j)
+ copy.add(list.get(j));
+ copy.add(list.get(i));
+ }
+ if (--limit == 0)
+ {
+ ++i;
+ break;
+ }
+ }
+ else if (beginRun >= 0 && endRun < 0)
+ endRun = i;
+ }
+
+ if (beginRun < 0)
+ beginRun = endRun = 0;
+ if (endRun < 0)
+ endRun = i;
+ if (copy == null)
+ return subList(beginRun, endRun);
+ return snapshot(copy);
+ }
+
+ public final class Select
+ {
+ private final List<Replica> result;
+ public Select(int expectedSize)
+ {
+ this.result = new ArrayList<>(expectedSize);
+ }
+
+ /**
+ * Add matching replica to the result; this predicate should be mutually exclusive with all prior predicates.
+ * Stop once we have targetSize replicas in total, including preceding calls
+ */
+ public Select add(Predicate<Replica> predicate, int targetSize)
+ {
+ assert !Iterables.any(result, predicate::test);
+ for (int i = 0 ; result.size() < targetSize && i < list.size() ; ++i)
+ if (predicate.test(list.get(i)))
+ result.add(list.get(i));
+ return this;
+ }
+ public Select add(Predicate<Replica> predicate)
+ {
+ return add(predicate, Integer.MAX_VALUE);
+ }
+ public C get()
+ {
+ return snapshot(result);
+ }
+ }
+
+ /**
+ * An efficient method for selecting a subset of replica via a sequence of filters.
+ *
+ * Example: select().add(filter1).add(filter2, 3).get();
+ *
+ * @return a Select object
+ */
+ public final Select select()
+ {
+ return select(list.size());
+ }
+ public final Select select(int expectedSize)
+ {
+ return new Select(expectedSize);
+ }
+
+ public final C sorted(Comparator<Replica> comparator)
+ {
+ List<Replica> copy = new ArrayList<>(list);
+ copy.sort(comparator);
+ return snapshot(copy);
+ }
+
+ public final Replica get(int i)
+ {
+ return list.get(i);
+ }
+
+ public final int size()
+ {
+ return list.size();
+ }
+
+ public final boolean isEmpty()
+ {
+ return list.isEmpty();
+ }
+
+ public final Iterator<Replica> iterator()
+ {
+ return list.iterator();
+ }
+
+ public final Stream<Replica> stream() { return list.stream(); }
+
+ public final boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (!(o instanceof AbstractReplicaCollection<?>))
+ {
+ if (!(o instanceof ReplicaCollection<?>))
+ return false;
+
+ ReplicaCollection<?> that = (ReplicaCollection<?>) o;
+ return Iterables.elementsEqual(this, that);
+ }
+ AbstractReplicaCollection<?> that = (AbstractReplicaCollection<?>) o;
+ return Objects.equals(list, that.list);
+ }
+
+ public final int hashCode()
+ {
+ return list.hashCode();
+ }
+
+ @Override
+ public final String toString()
+ {
+ return list.toString();
+ }
+
+ static <C extends AbstractReplicaCollection<C>> C concat(C replicas, C extraReplicas, Mutable.Conflict ignoreConflicts)
+ {
+ if (extraReplicas.isEmpty())
+ return replicas;
+ if (replicas.isEmpty())
+ return extraReplicas;
+ Mutable<C> mutable = replicas.newMutable(replicas.size() + extraReplicas.size());
+ mutable.addAll(replicas);
+ mutable.addAll(extraReplicas, ignoreConflicts);
+ return mutable.asImmutableView();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 3e9b5bb..0ddc0a4 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -22,8 +22,8 @@ import java.lang.reflect.InvocationTargetException;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
+import com.google.common.base.Preconditions;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,9 +73,9 @@ public abstract class AbstractReplicationStrategy
// lazy-initialize keyspace itself since we don't create them until after the replication strategies
}
- private final Map<Token, ArrayList<InetAddressAndPort>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddressAndPort>>();
+ private final Map<Token, EndpointsForRange> cachedReplicas = new NonBlockingHashMap<>();
- public ArrayList<InetAddressAndPort> getCachedEndpoints(Token t)
+ public EndpointsForRange getCachedReplicas(Token t)
{
long lastVersion = tokenMetadata.getRingVersion();
@@ -86,13 +86,13 @@ public abstract class AbstractReplicationStrategy
if (lastVersion > lastInvalidatedVersion)
{
logger.trace("clearing cached endpoints");
- cachedEndpoints.clear();
+ cachedReplicas.clear();
lastInvalidatedVersion = lastVersion;
}
}
}
- return cachedEndpoints.get(t);
+ return cachedReplicas.get(t);
}
/**
@@ -102,64 +102,65 @@ public abstract class AbstractReplicationStrategy
* @param searchPosition the position the natural endpoints are requested for
* @return a copy of the natural endpoints for the given token
*/
- public ArrayList<InetAddressAndPort> getNaturalEndpoints(RingPosition searchPosition)
+ public EndpointsForToken getNaturalReplicasForToken(RingPosition searchPosition)
+ {
+ return getNaturalReplicas(searchPosition).forToken(searchPosition.getToken());
+ }
+
+ public EndpointsForRange getNaturalReplicas(RingPosition searchPosition)
{
Token searchToken = searchPosition.getToken();
Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
- ArrayList<InetAddressAndPort> endpoints = getCachedEndpoints(keyToken);
+ EndpointsForRange endpoints = getCachedReplicas(keyToken);
if (endpoints == null)
{
TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap();
// if our cache got invalidated, it's possible there is a new token to account for too
keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken);
- endpoints = new ArrayList<InetAddressAndPort>(calculateNaturalEndpoints(searchToken, tm));
- cachedEndpoints.put(keyToken, endpoints);
+ endpoints = calculateNaturalReplicas(searchToken, tm);
+ cachedReplicas.put(keyToken, endpoints);
}
- return new ArrayList<InetAddressAndPort>(endpoints);
+ return endpoints;
}
/**
* calculate the natural endpoints for the given token
*
- * @see #getNaturalEndpoints(org.apache.cassandra.dht.RingPosition)
+ * @see #getNaturalReplicasForToken(org.apache.cassandra.dht.RingPosition)
*
* @param searchToken the token the natural endpoints are requested for
* @return a copy of the natural endpoints for the given token
*/
- public abstract List<InetAddressAndPort> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata);
+ public abstract EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata);
- public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints,
- Collection<InetAddressAndPort> pendingEndpoints,
- ConsistencyLevel consistency_level,
+ public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
Runnable callback,
WriteType writeType,
long queryStartNanoTime)
{
- return getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel());
+ return getWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel());
}
- public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints,
- Collection<InetAddressAndPort> pendingEndpoints,
- ConsistencyLevel consistency_level,
+ public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
Runnable callback,
WriteType writeType,
long queryStartNanoTime,
ConsistencyLevel idealConsistencyLevel)
{
AbstractWriteResponseHandler resultResponseHandler;
- if (consistency_level.isDatacenterLocal())
+ if (replicaLayout.consistencyLevel.isDatacenterLocal())
{
// block for in this context will be localnodes block.
- resultResponseHandler = new DatacenterWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime);
+ resultResponseHandler = new DatacenterWriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime);
}
- else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
+ else if (replicaLayout.consistencyLevel == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
{
- resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime);
+ resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime);
}
else
{
- resultResponseHandler = new WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime);
+ resultResponseHandler = new WriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime);
}
//Check if tracking the ideal consistency level is configured
@@ -168,16 +169,14 @@ public abstract class AbstractReplicationStrategy
//If ideal and requested are the same just use this handler to track the ideal consistency level
//This is also used so that the ideal consistency level handler when constructed knows it is the ideal
//one for tracking purposes
- if (idealConsistencyLevel == consistency_level)
+ if (idealConsistencyLevel == replicaLayout.consistencyLevel)
{
resultResponseHandler.setIdealCLResponseHandler(resultResponseHandler);
}
else
{
//Construct a delegate response handler to use to track the ideal consistency level
- AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(naturalEndpoints,
- pendingEndpoints,
- idealConsistencyLevel,
+ AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(replicaLayout.withConsistencyLevel(idealConsistencyLevel),
callback,
writeType,
queryStartNanoTime,
@@ -202,7 +201,12 @@ public abstract class AbstractReplicationStrategy
*
* @return the replication factor
*/
- public abstract int getReplicationFactor();
+ public abstract ReplicationFactor getReplicationFactor();
+
+ public boolean hasTransientReplicas()
+ {
+ return getReplicationFactor().hasTransientReplicas();
+ }
/*
* NOTE: this is pretty inefficient. also the inverse (getRangeAddresses) below.
@@ -210,53 +214,81 @@ public abstract class AbstractReplicationStrategy
* (fixing this would probably require merging tokenmetadata into replicationstrategy,
* so we could cache/invalidate cleanly.)
*/
- public Multimap<InetAddressAndPort, Range<Token>> getAddressRanges(TokenMetadata metadata)
+ public RangesByEndpoint getAddressReplicas(TokenMetadata metadata)
{
- Multimap<InetAddressAndPort, Range<Token>> map = HashMultimap.create();
+ RangesByEndpoint.Mutable map = new RangesByEndpoint.Mutable();
for (Token token : metadata.sortedTokens())
{
Range<Token> range = metadata.getPrimaryRangeFor(token);
- for (InetAddressAndPort ep : calculateNaturalEndpoints(token, metadata))
+ for (Replica replica : calculateNaturalReplicas(token, metadata))
{
- map.put(ep, range);
+ // LocalStrategy always returns (min, min] ranges for it's replicas, so we skip the check here
+ Preconditions.checkState(range.equals(replica.range()) || this instanceof LocalStrategy);
+ map.put(replica.endpoint(), replica);
}
}
- return map;
+ return map.asImmutableView();
}
- public Multimap<Range<Token>, InetAddressAndPort> getRangeAddresses(TokenMetadata metadata)
+ public RangesAtEndpoint getAddressReplicas(TokenMetadata metadata, InetAddressAndPort endpoint)
{
- Multimap<Range<Token>, InetAddressAndPort> map = HashMultimap.create();
+ RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint);
+ for (Token token : metadata.sortedTokens())
+ {
+ Range<Token> range = metadata.getPrimaryRangeFor(token);
+ Replica replica = calculateNaturalReplicas(token, metadata)
+ .byEndpoint().get(endpoint);
+ if (replica != null)
+ {
+ // LocalStrategy always returns (min, min] ranges for it's replicas, so we skip the check here
+ Preconditions.checkState(range.equals(replica.range()) || this instanceof LocalStrategy);
+ builder.add(replica, Conflict.DUPLICATE);
+ }
+ }
+ return builder.build();
+ }
+
+
+ public EndpointsByRange getRangeAddresses(TokenMetadata metadata)
+ {
+ EndpointsByRange.Mutable map = new EndpointsByRange.Mutable();
for (Token token : metadata.sortedTokens())
{
Range<Token> range = metadata.getPrimaryRangeFor(token);
- for (InetAddressAndPort ep : calculateNaturalEndpoints(token, metadata))
+ for (Replica replica : calculateNaturalReplicas(token, metadata))
{
- map.put(range, ep);
+ // LocalStrategy always returns (min, min] ranges for it's replicas, so we skip the check here
+ Preconditions.checkState(range.equals(replica.range()) || this instanceof LocalStrategy);
+ map.put(range, replica);
}
}
- return map;
+ return map.asImmutableView();
}
- public Multimap<InetAddressAndPort, Range<Token>> getAddressRanges()
+ public RangesByEndpoint getAddressReplicas()
{
- return getAddressRanges(tokenMetadata.cloneOnlyTokenMap());
+ return getAddressReplicas(tokenMetadata.cloneOnlyTokenMap());
}
- public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddressAndPort pendingAddress)
+ public RangesAtEndpoint getAddressReplicas(InetAddressAndPort endpoint)
{
- return getPendingAddressRanges(metadata, Arrays.asList(pendingToken), pendingAddress);
+ return getAddressReplicas(tokenMetadata.cloneOnlyTokenMap(), endpoint);
}
- public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Collection<Token> pendingTokens, InetAddressAndPort pendingAddress)
+ public RangesAtEndpoint getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddressAndPort pendingAddress)
+ {
+ return getPendingAddressRanges(metadata, Collections.singleton(pendingToken), pendingAddress);
+ }
+
+ public RangesAtEndpoint getPendingAddressRanges(TokenMetadata metadata, Collection<Token> pendingTokens, InetAddressAndPort pendingAddress)
{
TokenMetadata temp = metadata.cloneOnlyTokenMap();
temp.updateNormalTokens(pendingTokens, pendingAddress);
- return getAddressRanges(temp).get(pendingAddress);
+ return getAddressReplicas(temp, pendingAddress);
}
public abstract void validateOptions() throws ConfigurationException;
@@ -329,6 +361,10 @@ public abstract class AbstractReplicationStrategy
AbstractReplicationStrategy strategy = createInternal(keyspaceName, strategyClass, tokenMetadata, snitch, strategyOptions);
strategy.validateExpectedOptions();
strategy.validateOptions();
+ if (strategy.hasTransientReplicas() && !DatabaseDescriptor.isTransientReplicationEnabled())
+ {
+ throw new ConfigurationException("Transient replication is disabled. Enable in cassandra.yaml to use.");
+ }
}
public static Class<AbstractReplicationStrategy> getClass(String cls) throws ConfigurationException
@@ -344,21 +380,23 @@ public abstract class AbstractReplicationStrategy
public boolean hasSameSettings(AbstractReplicationStrategy other)
{
- return getClass().equals(other.getClass()) && getReplicationFactor() == other.getReplicationFactor();
+ return getClass().equals(other.getClass()) && getReplicationFactor().equals(other.getReplicationFactor());
}
- protected void validateReplicationFactor(String rf) throws ConfigurationException
+ protected void validateReplicationFactor(String s) throws ConfigurationException
{
try
{
- if (Integer.parseInt(rf) < 0)
+ ReplicationFactor rf = ReplicationFactor.fromString(s);
+ if (rf.hasTransientReplicas())
{
- throw new ConfigurationException("Replication factor must be non-negative; found " + rf);
+ if (DatabaseDescriptor.getNumTokens() > 1)
+ throw new ConfigurationException(String.format("Transient replication is not supported with vnodes yet"));
}
}
- catch (NumberFormatException e2)
+ catch (IllegalArgumentException e)
{
- throw new ConfigurationException("Replication factor must be numeric; found " + rf);
+ throw new ConfigurationException(e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 010c892..d35f1fb 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -42,7 +42,6 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-
/**
* A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
*/
@@ -185,55 +184,38 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
return subsnitch.getDatacenter(endpoint);
}
- public List<InetAddressAndPort> getSortedListByProximity(final InetAddressAndPort address, Collection<InetAddressAndPort> addresses)
- {
- List<InetAddressAndPort> list = new ArrayList<>(addresses);
- sortByProximity(address, list);
- return list;
- }
-
@Override
- public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
+ public <C extends ReplicaCollection<? extends C>> C sortedByProximity(final InetAddressAndPort address, C unsortedAddresses)
{
assert address.equals(FBUtilities.getBroadcastAddressAndPort()); // we only know about ourself
- if (dynamicBadnessThreshold == 0)
- {
- sortByProximityWithScore(address, addresses);
- }
- else
- {
- sortByProximityWithBadness(address, addresses);
- }
+ return dynamicBadnessThreshold == 0
+ ? sortedByProximityWithScore(address, unsortedAddresses)
+ : sortedByProximityWithBadness(address, unsortedAddresses);
}
- private void sortByProximityWithScore(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
+ private <C extends ReplicaCollection<? extends C>> C sortedByProximityWithScore(final InetAddressAndPort address, C unsortedAddresses)
{
// Scores can change concurrently from a call to this method. But Collections.sort() expects
// its comparator to be "stable", that is 2 endpoint should compare the same way for the duration
// of the sort() call. As we copy the scores map on write, it is thus enough to alias the current
// version of it during this call.
final HashMap<InetAddressAndPort, Double> scores = this.scores;
- Collections.sort(addresses, new Comparator<InetAddressAndPort>()
- {
- public int compare(InetAddressAndPort a1, InetAddressAndPort a2)
- {
- return compareEndpoints(address, a1, a2, scores);
- }
- });
+ return unsortedAddresses.sorted((r1, r2) -> compareEndpoints(address, r1, r2, scores));
}
- private void sortByProximityWithBadness(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
+ private <C extends ReplicaCollection<? extends C>> C sortedByProximityWithBadness(final InetAddressAndPort address, C replicas)
{
- if (addresses.size() < 2)
- return;
+ if (replicas.size() < 2)
+ return replicas;
- subsnitch.sortByProximity(address, addresses);
+ // TODO: avoid copy
+ replicas = subsnitch.sortedByProximity(address, replicas);
HashMap<InetAddressAndPort, Double> scores = this.scores; // Make sure the score don't change in the middle of the loop below
// (which wouldn't really matter here but its cleaner that way).
- ArrayList<Double> subsnitchOrderedScores = new ArrayList<>(addresses.size());
- for (InetAddressAndPort inet : addresses)
+ ArrayList<Double> subsnitchOrderedScores = new ArrayList<>(replicas.size());
+ for (Replica replica : replicas)
{
- Double score = scores.get(inet);
+ Double score = scores.get(replica.endpoint());
if (score == null)
score = 0.0;
subsnitchOrderedScores.add(score);
@@ -250,17 +232,18 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
{
if (subsnitchScore > (sortedScoreIterator.next() * (1.0 + dynamicBadnessThreshold)))
{
- sortByProximityWithScore(address, addresses);
- return;
+ return sortedByProximityWithScore(address, replicas);
}
}
+
+ return replicas;
}
// Compare endpoints given an immutable snapshot of the scores
- private int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2, Map<InetAddressAndPort, Double> scores)
+ private int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2, Map<InetAddressAndPort, Double> scores)
{
- Double scored1 = scores.get(a1);
- Double scored2 = scores.get(a2);
+ Double scored1 = scores.get(a1.endpoint());
+ Double scored2 = scores.get(a2.endpoint());
if (scored1 == null)
{
@@ -280,7 +263,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
return 1;
}
- public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+ public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2)
{
// That function is fundamentally unsafe because the scores can change at any time and so the result of that
// method is not stable for identical arguments. This is why we don't rely on super.sortByProximity() in
@@ -414,7 +397,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
return getSeverity(FBUtilities.getBroadcastAddressAndPort());
}
- public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2)
+ public boolean isWorthMergingForRangeQuery(ReplicaCollection<?> merged, ReplicaCollection<?> l1, ReplicaCollection<?> l2)
{
if (!subsnitch.isWorthMergingForRangeQuery(merged, l1, l2))
return false;
@@ -434,12 +417,12 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
}
// Return the max score for the endpoint in the provided list, or -1.0 if no node have a score.
- private double maxScore(List<InetAddressAndPort> endpoints)
+ private double maxScore(ReplicaCollection<?> endpoints)
{
double maxScore = -1.0;
- for (InetAddressAndPort endpoint : endpoints)
+ for (Replica replica : endpoints)
{
- Double score = scores.get(endpoint);
+ Double score = scores.get(replica.endpoint());
if (score == null)
continue;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/Ec2Snitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Ec2Snitch.java b/src/java/org/apache/cassandra/locator/Ec2Snitch.java
index b6aafd3..d0474e4 100644
--- a/src/java/org/apache/cassandra/locator/Ec2Snitch.java
+++ b/src/java/org/apache/cassandra/locator/Ec2Snitch.java
@@ -68,7 +68,7 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch
{
String az = awsApiCall(ZONE_NAME_QUERY_URL);
- // if using the full naming scheme, region name is created by removing letters from the
+ // if using the full naming scheme, region name is created by removing letters from the
// end of the availability zone and zone is the full zone name
usingLegacyNaming = isUsingLegacyNaming(props);
String region;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/Endpoints.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Endpoints.java b/src/java/org/apache/cassandra/locator/Endpoints.java
new file mode 100644
index 0000000..3d5faa4
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/Endpoints.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
+import org.apache.cassandra.utils.FBUtilities;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaCollection<E>
+{
+ static final Map<InetAddressAndPort, Replica> EMPTY_MAP = Collections.unmodifiableMap(new LinkedHashMap<>());
+
+ volatile Map<InetAddressAndPort, Replica> byEndpoint;
+
+ Endpoints(List<Replica> list, boolean isSnapshot, Map<InetAddressAndPort, Replica> byEndpoint)
+ {
+ super(list, isSnapshot);
+ this.byEndpoint = byEndpoint;
+ }
+
+ @Override
+ public Set<InetAddressAndPort> endpoints()
+ {
+ return byEndpoint().keySet();
+ }
+
+ public Map<InetAddressAndPort, Replica> byEndpoint()
+ {
+ Map<InetAddressAndPort, Replica> map = byEndpoint;
+ if (map == null)
+ byEndpoint = map = buildByEndpoint(list);
+ return map;
+ }
+
+ public boolean contains(InetAddressAndPort endpoint, boolean isFull)
+ {
+ Replica replica = byEndpoint().get(endpoint);
+ return replica != null && replica.isFull() == isFull;
+ }
+
+ @Override
+ public boolean contains(Replica replica)
+ {
+ return replica != null
+ && Objects.equals(
+ byEndpoint().get(replica.endpoint()),
+ replica);
+ }
+
+ private static Map<InetAddressAndPort, Replica> buildByEndpoint(List<Replica> list)
+ {
+ // TODO: implement a delegating map that uses our superclass' list, and is immutable
+ Map<InetAddressAndPort, Replica> byEndpoint = new LinkedHashMap<>(list.size());
+ for (Replica replica : list)
+ {
+ Replica prev = byEndpoint.put(replica.endpoint(), replica);
+ assert prev == null : "duplicate endpoint in EndpointsForRange: " + prev + " and " + replica;
+ }
+
+ return Collections.unmodifiableMap(byEndpoint);
+ }
+
+ public E withoutSelf()
+ {
+ InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
+ return filter(r -> !self.equals(r.endpoint()));
+ }
+
+ public E without(Set<InetAddressAndPort> remove)
+ {
+ return filter(r -> !remove.contains(r.endpoint()));
+ }
+
+ public E keep(Set<InetAddressAndPort> keep)
+ {
+ return filter(r -> keep.contains(r.endpoint()));
+ }
+
+ public E keep(Iterable<InetAddressAndPort> endpoints)
+ {
+ ReplicaCollection.Mutable<E> copy = newMutable(
+ endpoints instanceof Collection<?>
+ ? ((Collection<InetAddressAndPort>) endpoints).size()
+ : size()
+ );
+ Map<InetAddressAndPort, Replica> byEndpoint = byEndpoint();
+ for (InetAddressAndPort endpoint : endpoints)
+ {
+ Replica keep = byEndpoint.get(endpoint);
+ if (keep == null)
+ continue;
+ copy.add(keep, ReplicaCollection.Mutable.Conflict.DUPLICATE);
+ }
+ return copy.asSnapshot();
+ }
+
+ /**
+ * Care must be taken to ensure no conflicting ranges occur in pending and natural.
+ * Conflicts can occur for two reasons:
+ * 1) due to lack of isolation when reading pending/natural
+ * 2) because a movement that changes the type of replication from transient to full must be handled
+ * differently for reads and writes (with the reader treating it as transient, and writer as full)
+ *
+ * The method haveConflicts() below, and resolveConflictsInX, are used to detect and resolve any issues
+ */
+ public static <E extends Endpoints<E>> E concat(E natural, E pending)
+ {
+ return AbstractReplicaCollection.concat(natural, pending, Conflict.NONE);
+ }
+
+ public static <E extends Endpoints<E>> boolean haveConflicts(E natural, E pending)
+ {
+ Set<InetAddressAndPort> naturalEndpoints = natural.endpoints();
+ for (InetAddressAndPort pendingEndpoint : pending.endpoints())
+ {
+ if (naturalEndpoints.contains(pendingEndpoint))
+ return true;
+ }
+ return false;
+ }
+
+ // must apply first
+ public static <E extends Endpoints<E>> E resolveConflictsInNatural(E natural, E pending)
+ {
+ return natural.filter(r -> !r.isTransient() || !pending.contains(r.endpoint(), true));
+ }
+
+ // must apply second
+ public static <E extends Endpoints<E>> E resolveConflictsInPending(E natural, E pending)
+ {
+ return pending.without(natural.endpoints());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsByRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsByRange.java b/src/java/org/apache/cassandra/locator/EndpointsByRange.java
new file mode 100644
index 0000000..cdc8a68
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/EndpointsByRange.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class EndpointsByRange extends ReplicaMultimap<Range<Token>, EndpointsForRange>
+{
+ public EndpointsByRange(Map<Range<Token>, EndpointsForRange> map)
+ {
+ super(map);
+ }
+
+ public EndpointsForRange get(Range<Token> range)
+ {
+ Preconditions.checkNotNull(range);
+ return map.getOrDefault(range, EndpointsForRange.empty(range));
+ }
+
+ public static class Mutable extends ReplicaMultimap.Mutable<Range<Token>, EndpointsForRange.Mutable>
+ {
+ @Override
+ protected EndpointsForRange.Mutable newMutable(Range<Token> range)
+ {
+ return new EndpointsForRange.Mutable(range);
+ }
+
+ // TODO: consider all ignoreDuplicates cases
+ public void putAll(Range<Token> range, EndpointsForRange replicas, Conflict ignoreConflicts)
+ {
+ get(range).addAll(replicas, ignoreConflicts);
+ }
+
+ public EndpointsByRange asImmutableView()
+ {
+ return new EndpointsByRange(Collections.unmodifiableMap(Maps.transformValues(map, EndpointsForRange.Mutable::asImmutableView)));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsByReplica.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsByReplica.java b/src/java/org/apache/cassandra/locator/EndpointsByReplica.java
new file mode 100644
index 0000000..ceea2d1
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/EndpointsByReplica.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class EndpointsByReplica extends ReplicaMultimap<Replica, EndpointsForRange>
+{
+ public EndpointsByReplica(Map<Replica, EndpointsForRange> map)
+ {
+ super(map);
+ }
+
+ public EndpointsForRange get(Replica range)
+ {
+ Preconditions.checkNotNull(range);
+ return map.getOrDefault(range, EndpointsForRange.empty(range.range()));
+ }
+
+ public static class Mutable extends ReplicaMultimap.Mutable<Replica, EndpointsForRange.Mutable>
+ {
+ @Override
+ protected EndpointsForRange.Mutable newMutable(Replica replica)
+ {
+ return new EndpointsForRange.Mutable(replica.range());
+ }
+
+ // TODO: consider all ignoreDuplicates cases
+ public void putAll(Replica range, EndpointsForRange replicas, Conflict ignoreConflicts)
+ {
+ map.computeIfAbsent(range, r -> newMutable(r)).addAll(replicas, ignoreConflicts);
+ }
+
+ public EndpointsByReplica asImmutableView()
+ {
+ return new EndpointsByReplica(Collections.unmodifiableMap(Maps.transformValues(map, EndpointsForRange.Mutable::asImmutableView)));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsForRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsForRange.java b/src/java/org/apache/cassandra/locator/EndpointsForRange.java
new file mode 100644
index 0000000..c2d8232
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/EndpointsForRange.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.base.Preconditions;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.collect.Iterables.all;
+
+/**
+ * A ReplicaCollection where all Replica are required to cover a range that fully contains the range() defined in the builder().
+ * Endpoints are guaranteed to be unique; on construction, this is enforced unless optionally silenced (in which case
+ * only the first occurrence makes the cut).
+ */
+public class EndpointsForRange extends Endpoints<EndpointsForRange>
+{
+ private final Range<Token> range;
+ private EndpointsForRange(Range<Token> range, List<Replica> list, boolean isSnapshot)
+ {
+ this(range, list, isSnapshot, null);
+ }
+ private EndpointsForRange(Range<Token> range, List<Replica> list, boolean isSnapshot, Map<InetAddressAndPort, Replica> byEndpoint)
+ {
+ super(list, isSnapshot, byEndpoint);
+ this.range = range;
+ assert range != null;
+ }
+
+ public Range<Token> range()
+ {
+ return range;
+ }
+
+ @Override
+ public Mutable newMutable(int initialCapacity)
+ {
+ return new Mutable(range, initialCapacity);
+ }
+
+ public EndpointsForToken forToken(Token token)
+ {
+ if (!range.contains(token))
+ throw new IllegalArgumentException(token + " is not contained within " + range);
+ return new EndpointsForToken(token, list, isSnapshot, byEndpoint);
+ }
+
+ @Override
+ public EndpointsForRange self()
+ {
+ return this;
+ }
+
+ @Override
+ protected EndpointsForRange snapshot(List<Replica> snapshot)
+ {
+ if (snapshot.isEmpty()) return empty(range);
+ return new EndpointsForRange(range, snapshot, true);
+ }
+
+ public static class Mutable extends EndpointsForRange implements ReplicaCollection.Mutable<EndpointsForRange>
+ {
+ boolean hasSnapshot;
+ public Mutable(Range<Token> range) { this(range, 0); }
+ public Mutable(Range<Token> range, int capacity) { super(range, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+
+ public void add(Replica replica, Conflict ignoreConflict)
+ {
+ if (hasSnapshot) throw new IllegalStateException();
+ Preconditions.checkNotNull(replica);
+ if (!replica.range().contains(super.range))
+ throw new IllegalArgumentException("Replica " + replica + " does not contain " + super.range);
+
+ Replica prev = super.byEndpoint.put(replica.endpoint(), replica);
+ if (prev != null)
+ {
+ super.byEndpoint.put(replica.endpoint(), prev); // restore prev
+ switch (ignoreConflict)
+ {
+ case DUPLICATE:
+ if (prev.equals(replica))
+ break;
+ case NONE:
+ throw new IllegalArgumentException("Conflicting replica added (expected unique endpoints): " + replica + "; existing: " + prev);
+ case ALL:
+ }
+ return;
+ }
+
+ list.add(replica);
+ }
+
+ @Override
+ public Map<InetAddressAndPort, Replica> byEndpoint()
+ {
+ // our internal map is modifiable, but it is unsafe to modify the map externally
+ // it would be possible to implement a safe modifiable map, but it is probably not valuable
+ return Collections.unmodifiableMap(super.byEndpoint());
+ }
+
+ private EndpointsForRange get(boolean isSnapshot)
+ {
+ return new EndpointsForRange(super.range, super.list, isSnapshot, Collections.unmodifiableMap(super.byEndpoint));
+ }
+
+ public EndpointsForRange asImmutableView()
+ {
+ return get(false);
+ }
+
+ public EndpointsForRange asSnapshot()
+ {
+ hasSnapshot = true;
+ return get(true);
+ }
+ }
+
+ public static class Builder extends ReplicaCollection.Builder<EndpointsForRange, Mutable, EndpointsForRange.Builder>
+ {
+ public Builder(Range<Token> range) { this(range, 0); }
+ public Builder(Range<Token> range, int capacity) { super (new Mutable(range, capacity)); }
+ public boolean containsEndpoint(InetAddressAndPort endpoint)
+ {
+ return mutable.asImmutableView().byEndpoint.containsKey(endpoint);
+ }
+ }
+
+ public static Builder builder(Range<Token> range)
+ {
+ return new Builder(range);
+ }
+ public static Builder builder(Range<Token> range, int capacity)
+ {
+ return new Builder(range, capacity);
+ }
+
+ public static EndpointsForRange empty(Range<Token> range)
+ {
+ return new EndpointsForRange(range, EMPTY_LIST, true, EMPTY_MAP);
+ }
+
+ public static EndpointsForRange of(Replica replica)
+ {
+ // we only use ArrayList or ArrayList.SubList, to ensure callsites are bimorphic
+ ArrayList<Replica> one = new ArrayList<>(1);
+ one.add(replica);
+ // we can safely use singletonMap, as we only otherwise use LinkedHashMap
+ return new EndpointsForRange(replica.range(), one, true, Collections.unmodifiableMap(Collections.singletonMap(replica.endpoint(), replica)));
+ }
+
+ public static EndpointsForRange of(Replica ... replicas)
+ {
+ return copyOf(Arrays.asList(replicas));
+ }
+
+ public static EndpointsForRange copyOf(Collection<Replica> replicas)
+ {
+ if (replicas.isEmpty())
+ throw new IllegalArgumentException("Collection must be non-empty to copy");
+ Range<Token> range = replicas.iterator().next().range();
+ assert all(replicas, r -> range.equals(r.range()));
+ return builder(range, replicas.size()).addAll(replicas).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsForToken.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsForToken.java b/src/java/org/apache/cassandra/locator/EndpointsForToken.java
new file mode 100644
index 0000000..f24c615
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/EndpointsForToken.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.base.Preconditions;
+import org.apache.cassandra.dht.Token;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A ReplicaCollection where all Replica are required to cover a range that fully contains the token() defined in the builder().
+ * Endpoints are guaranteed to be unique; on construction, this is enforced unless optionally silenced (in which case
+ * only the first occurrence makes the cut).
+ */
+public class EndpointsForToken extends Endpoints<EndpointsForToken>
+{
+ private final Token token;
+ private EndpointsForToken(Token token, List<Replica> list, boolean isSnapshot)
+ {
+ this(token, list, isSnapshot, null);
+ }
+
+ EndpointsForToken(Token token, List<Replica> list, boolean isSnapshot, Map<InetAddressAndPort, Replica> byEndpoint)
+ {
+ super(list, isSnapshot, byEndpoint);
+ this.token = token;
+ assert token != null;
+ }
+
+ public Token token()
+ {
+ return token;
+ }
+
+ @Override
+ public Mutable newMutable(int initialCapacity)
+ {
+ return new Mutable(token, initialCapacity);
+ }
+
+ @Override
+ public EndpointsForToken self()
+ {
+ return this;
+ }
+
+ @Override
+ protected EndpointsForToken snapshot(List<Replica> subList)
+ {
+ if (subList.isEmpty()) return empty(token);
+ return new EndpointsForToken(token, subList, true);
+ }
+
+ public static class Mutable extends EndpointsForToken implements ReplicaCollection.Mutable<EndpointsForToken>
+ {
+ boolean hasSnapshot;
+ public Mutable(Token token) { this(token, 0); }
+ public Mutable(Token token, int capacity) { super(token, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+
+ public void add(Replica replica, Conflict ignoreConflict)
+ {
+ if (hasSnapshot) throw new IllegalStateException();
+ Preconditions.checkNotNull(replica);
+ if (!replica.range().contains(super.token))
+ throw new IllegalArgumentException("Replica " + replica + " does not contain " + super.token);
+
+ Replica prev = super.byEndpoint.put(replica.endpoint(), replica);
+ if (prev != null)
+ {
+ super.byEndpoint.put(replica.endpoint(), prev); // restore prev
+ switch (ignoreConflict)
+ {
+ case DUPLICATE:
+ if (prev.equals(replica))
+ break;
+ case NONE:
+ throw new IllegalArgumentException("Conflicting replica added (expected unique endpoints): " + replica + "; existing: " + prev);
+ case ALL:
+ }
+ return;
+ }
+
+ list.add(replica);
+ }
+
+ @Override
+ public Map<InetAddressAndPort, Replica> byEndpoint()
+ {
+ // our internal map is modifiable, but it is unsafe to modify the map externally
+ // it would be possible to implement a safe modifiable map, but it is probably not valuable
+ return Collections.unmodifiableMap(super.byEndpoint());
+ }
+
+ private EndpointsForToken get(boolean isSnapshot)
+ {
+ return new EndpointsForToken(super.token, super.list, isSnapshot, Collections.unmodifiableMap(super.byEndpoint));
+ }
+
+ public EndpointsForToken asImmutableView()
+ {
+ return get(false);
+ }
+
+ public EndpointsForToken asSnapshot()
+ {
+ hasSnapshot = true;
+ return get(true);
+ }
+ }
+
+ public static class Builder extends ReplicaCollection.Builder<EndpointsForToken, Mutable, EndpointsForToken.Builder>
+ {
+ public Builder(Token token) { this(token, 0); }
+ public Builder(Token token, int capacity) { super (new Mutable(token, capacity)); }
+ }
+
+ public static Builder builder(Token token)
+ {
+ return new Builder(token);
+ }
+ public static Builder builder(Token token, int capacity)
+ {
+ return new Builder(token, capacity);
+ }
+
+ public static EndpointsForToken empty(Token token)
+ {
+ return new EndpointsForToken(token, EMPTY_LIST, true, EMPTY_MAP);
+ }
+
+ public static EndpointsForToken of(Token token, Replica replica)
+ {
+ // we only use ArrayList or ArrayList.SubList, to ensure callsites are bimorphic
+ ArrayList<Replica> one = new ArrayList<>(1);
+ one.add(replica);
+ // we can safely use singletonMap, as we only otherwise use LinkedHashMap
+ return new EndpointsForToken(token, one, true, Collections.unmodifiableMap(Collections.singletonMap(replica.endpoint(), replica)));
+ }
+
+ public static EndpointsForToken of(Token token, Replica ... replicas)
+ {
+ return copyOf(token, Arrays.asList(replicas));
+ }
+
+ public static EndpointsForToken copyOf(Token token, Collection<Replica> replicas)
+ {
+ if (replicas.isEmpty()) return empty(token);
+ return builder(token, replicas.size()).addAll(replicas).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
index 63d333b..b7797b0 100644
--- a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
@@ -17,8 +17,6 @@
*/
package org.apache.cassandra.locator;
-import java.util.Collection;
-import java.util.List;
import java.util.Set;
/**
@@ -39,20 +37,20 @@ public interface IEndpointSnitch
*/
public String getDatacenter(InetAddressAndPort endpoint);
- /**
- * returns a new <tt>List</tt> sorted by proximity to the given endpoint
- */
- public List<InetAddressAndPort> getSortedListByProximity(InetAddressAndPort address, Collection<InetAddressAndPort> unsortedAddress);
+ default public String getDatacenter(Replica replica)
+ {
+ return getDatacenter(replica.endpoint());
+ }
/**
- * This method will sort the <tt>List</tt> by proximity to the given address.
+ * returns a new <tt>List</tt> sorted by proximity to the given endpoint
*/
- public void sortByProximity(InetAddressAndPort address, List<InetAddressAndPort> addresses);
+ public <C extends ReplicaCollection<? extends C>> C sortedByProximity(final InetAddressAndPort address, C addresses);
/**
* compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
*/
- public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2);
+ public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2);
/**
* called after Gossiper instance exists immediately before it starts gossiping
@@ -63,7 +61,7 @@ public interface IEndpointSnitch
* Returns whether for a range query doing a query against merged is likely
* to be faster than 2 sequential queries, one against l1 followed by one against l2.
*/
- public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2);
+ public boolean isWorthMergingForRangeQuery(ReplicaCollection<?> merged, ReplicaCollection<?> l1, ReplicaCollection<?> l2);
/**
* Determine if the datacenter or rack values in the current node's snitch conflict with those passed in parameters.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
index 38a1a49..a47c72a 100644
--- a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
+++ b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
@@ -25,6 +25,7 @@ import java.net.UnknownHostException;
import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FastByteOperations;
/**
@@ -191,9 +192,9 @@ public final class InetAddressAndPort implements Comparable<InetAddressAndPort>,
return InetAddressAndPort.getByAddress(InetAddress.getLoopbackAddress());
}
- public static InetAddressAndPort getLocalHost() throws UnknownHostException
+ public static InetAddressAndPort getLocalHost()
{
- return InetAddressAndPort.getByAddress(InetAddress.getLocalHost());
+ return FBUtilities.getLocalAddressAndPort();
}
public static void initializeDefaultPort(int port)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/LocalStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java b/src/java/org/apache/cassandra/locator/LocalStrategy.java
index a76fe96..41cc9b0 100644
--- a/src/java/org/apache/cassandra/locator/LocalStrategy.java
+++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java
@@ -17,12 +17,11 @@
*/
package org.apache.cassandra.locator;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.dht.RingPosition;
import org.apache.cassandra.dht.Token;
@@ -30,32 +29,40 @@ import org.apache.cassandra.utils.FBUtilities;
public class LocalStrategy extends AbstractReplicationStrategy
{
+ private static final ReplicationFactor RF = ReplicationFactor.fullOnly(1);
+ private final EndpointsForRange replicas;
+
public LocalStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
{
super(keyspaceName, tokenMetadata, snitch, configOptions);
+ replicas = EndpointsForRange.of(
+ new Replica(FBUtilities.getBroadcastAddressAndPort(),
+ DatabaseDescriptor.getPartitioner().getMinimumToken(),
+ DatabaseDescriptor.getPartitioner().getMinimumToken(),
+ true
+ )
+ );
}
/**
- * We need to override this even if we override calculateNaturalEndpoints,
+ * We need to override this even if we override calculateNaturalReplicas,
* because the default implementation depends on token calculations but
* LocalStrategy may be used before tokens are set up.
*/
@Override
- public ArrayList<InetAddressAndPort> getNaturalEndpoints(RingPosition searchPosition)
+ public EndpointsForRange getNaturalReplicas(RingPosition searchPosition)
{
- ArrayList<InetAddressAndPort> l = new ArrayList<InetAddressAndPort>(1);
- l.add(FBUtilities.getBroadcastAddressAndPort());
- return l;
+ return replicas;
}
- public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+ public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
{
- return Collections.singletonList(FBUtilities.getBroadcastAddressAndPort());
+ return replicas;
}
- public int getReplicationFactor()
+ public ReplicationFactor getReplicationFactor()
{
- return 1;
+ return RF;
}
public void validateOptions() throws ConfigurationException
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org