You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:35:57 UTC
[05/18] cassandra git commit: Transient Replication and Cheap Quorums
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
index 447d504..374a760 100644
--- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.repair;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -42,6 +43,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.Schema;
@@ -64,6 +67,9 @@ public class PendingAntiCompactionTest
{
private static final Logger logger = LoggerFactory.getLogger(PendingAntiCompactionTest.class);
private static final Collection<Range<Token>> FULL_RANGE;
+ private static final Collection<Range<Token>> NO_RANGES = Collections.emptyList();
+ private static InetAddressAndPort local;
+
static
{
DatabaseDescriptor.daemonInitialization();
@@ -77,9 +83,10 @@ public class PendingAntiCompactionTest
private ColumnFamilyStore cfs;
@BeforeClass
- public static void setupClass()
+ public static void setupClass() throws Throwable
{
SchemaLoader.prepareServer();
+ local = InetAddressAndPort.getByName("127.0.0.1");
}
@Before
@@ -89,6 +96,7 @@ public class PendingAntiCompactionTest
cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build();
SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm);
cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
+
}
private void makeSSTables(int num)
@@ -105,7 +113,7 @@ public class PendingAntiCompactionTest
private static class InstrumentedAcquisitionCallback extends PendingAntiCompaction.AcquisitionCallback
{
- public InstrumentedAcquisitionCallback(UUID parentRepairSession, Collection<Range<Token>> ranges)
+ public InstrumentedAcquisitionCallback(UUID parentRepairSession, RangesAtEndpoint ranges)
{
super(parentRepairSession, ranges);
}
@@ -155,7 +163,7 @@ public class PendingAntiCompactionTest
ExecutorService executor = Executors.newSingleThreadExecutor();
try
{
- pac = new PendingAntiCompaction(sessionID, tables, ranges, executor);
+ pac = new PendingAntiCompaction(sessionID, tables, atEndpoint(ranges, NO_RANGES), executor);
pac.run().get();
}
finally
@@ -217,7 +225,7 @@ public class PendingAntiCompactionTest
Assert.assertTrue(repaired.intersects(FULL_RANGE));
Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
- repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 1, null);
+ repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 1, null, false);
repaired.reloadSSTableMetadata();
PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID());
@@ -243,7 +251,7 @@ public class PendingAntiCompactionTest
Assert.assertTrue(repaired.intersects(FULL_RANGE));
Assert.assertTrue(unrepaired.intersects(FULL_RANGE));
- repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 0, UUIDGen.getTimeUUID());
+ repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 0, UUIDGen.getTimeUUID(), false);
repaired.reloadSSTableMetadata();
Assert.assertTrue(repaired.isPendingRepair());
@@ -284,7 +292,7 @@ public class PendingAntiCompactionTest
PendingAntiCompaction.AcquireResult result = acquisitionCallable.call();
Assert.assertNotNull(result);
- InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE);
+ InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES));
Assert.assertTrue(cb.submittedCompactions.isEmpty());
cb.apply(Lists.newArrayList(result));
@@ -308,7 +316,7 @@ public class PendingAntiCompactionTest
Assert.assertNotNull(result);
Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state());
- InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE);
+ InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, Collections.emptyList()));
Assert.assertTrue(cb.submittedCompactions.isEmpty());
cb.apply(Lists.newArrayList(result, null));
@@ -333,7 +341,7 @@ public class PendingAntiCompactionTest
ColumnFamilyStore cfs2 = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata("system", "peers").id);
PendingAntiCompaction.AcquireResult fakeResult = new PendingAntiCompaction.AcquireResult(cfs2, null, null);
- InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE);
+ InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES));
Assert.assertTrue(cb.submittedCompactions.isEmpty());
cb.apply(Lists.newArrayList(result, fakeResult));
@@ -359,8 +367,19 @@ public class PendingAntiCompactionTest
true,0,
true,
PreviewKind.NONE);
- CompactionManager.instance.performAnticompaction(result.cfs, FULL_RANGE, result.refs, result.txn,
- ActiveRepairService.UNREPAIRED_SSTABLE, sessionID, sessionID);
+ CompactionManager.instance.performAnticompaction(result.cfs, atEndpoint(FULL_RANGE, NO_RANGES), result.refs, result.txn, sessionID);
+
+ }
+
+ private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
+ {
+ RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);
+ for (Range<Token> range : full)
+ builder.add(new Replica(local, range, true));
+
+ for (Range<Token> range : trans)
+ builder.add(new Replica(local, range, false));
+ return builder.build();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
index 8256ac6..5e44346 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java
@@ -114,6 +114,7 @@ public class CassandraOutgoingFileTest
List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), getTokenAtIndex(4)),
new Range<>(getTokenAtIndex(2), getTokenAtIndex(6)),
new Range<>(getTokenAtIndex(5), sstable.last.getToken()));
+ requestedRanges = Range.normalize(requestedRanges);
CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(),
sstable.getPositionsForRanges(requestedRanges),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
index 86018af..b597bfe 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java
@@ -33,6 +33,8 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.RangesAtEndpoint;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -118,10 +120,10 @@ public class CassandraStreamManagerTest
return Iterables.getOnlyElement(diff);
}
- private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException
+ private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair, boolean isTransient) throws IOException
{
Descriptor descriptor = sstable.descriptor;
- descriptor.getMetadataSerializer().mutateRepaired(descriptor, repairedAt, pendingRepair);
+ descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, repairedAt, pendingRepair, isTransient);
sstable.reloadSSTableMetadata();
}
@@ -141,7 +143,7 @@ public class CassandraStreamManagerTest
private Set<SSTableReader> getReadersForRange(Range<Token> range)
{
Collection<OutgoingStream> streams = cfs.getStreamManager().createOutgoingStreams(session(NO_PENDING_REPAIR),
- Collections.singleton(range),
+ RangesAtEndpoint.toDummyList(Collections.singleton(range)),
NO_PENDING_REPAIR,
PreviewKind.NONE);
return sstablesFromStreams(streams);
@@ -151,7 +153,7 @@ public class CassandraStreamManagerTest
{
IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
Collection<Range<Token>> ranges = Lists.newArrayList(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
- Collection<OutgoingStream> streams = cfs.getStreamManager().createOutgoingStreams(session(pendingRepair), ranges, pendingRepair, PreviewKind.NONE);
+ Collection<OutgoingStream> streams = cfs.getStreamManager().createOutgoingStreams(session(pendingRepair), RangesAtEndpoint.toDummyList(ranges), pendingRepair, PreviewKind.NONE);
return sstablesFromStreams(streams);
}
@@ -167,9 +169,9 @@ public class CassandraStreamManagerTest
UUID pendingRepair = UUIDGen.getTimeUUID();
long repairedAt = System.currentTimeMillis();
- mutateRepaired(sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair);
- mutateRepaired(sstable3, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
- mutateRepaired(sstable4, repairedAt, NO_PENDING_REPAIR);
+ mutateRepaired(sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, false);
+ mutateRepaired(sstable3, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID(), false);
+ mutateRepaired(sstable4, repairedAt, NO_PENDING_REPAIR, false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/streaming/StreamRequestTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/streaming/StreamRequestTest.java b/test/unit/org/apache/cassandra/db/streaming/StreamRequestTest.java
new file mode 100644
index 0000000..2a6cb65
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/streaming/StreamRequestTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.StreamRequest;
+
+public class StreamRequestTest
+{
+ private static InetAddressAndPort local;
+ private final String ks = "keyspace";
+ private final int version = MessagingService.current_version;
+
+ @BeforeClass
+ public static void setUp() throws Throwable
+ {
+ DatabaseDescriptor.daemonInitialization();
+ local = InetAddressAndPort.getByName("127.0.0.1");
+ }
+
+ @Test
+ public void serializationRoundTrip() throws Throwable
+ {
+ StreamRequest orig = new StreamRequest(ks,
+ atEndpoint(Arrays.asList(range(1, 2), range(3, 4), range(5, 6)),
+ Collections.emptyList()),
+ atEndpoint(Collections.emptyList(),
+ Arrays.asList(range(5, 6), range(7, 8))),
+ Arrays.asList("a", "b", "c"));
+
+ int expectedSize = (int) StreamRequest.serializer.serializedSize(orig, version);
+ try (DataOutputBuffer out = new DataOutputBuffer(expectedSize))
+ {
+ StreamRequest.serializer.serialize(orig, out, version);
+ Assert.assertEquals(expectedSize, out.buffer().limit());
+ try (DataInputBuffer in = new DataInputBuffer(out.buffer(), false))
+ {
+ StreamRequest decoded = StreamRequest.serializer.deserialize(in, version);
+
+ Assert.assertEquals(orig.keyspace, decoded.keyspace);
+ Assert.assertEquals(orig.full, decoded.full);
+ Assert.assertEquals(orig.transientReplicas, decoded.transientReplicas);
+ Assert.assertEquals(orig.columnFamilies, decoded.columnFamilies);
+ }
+ }
+ }
+
+ private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans)
+ {
+ RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local);
+ for (Range<Token> range : full)
+ builder.add(new Replica(local, range, true));
+
+ for (Range<Token> range : trans)
+ builder.add(new Replica(local, range, false));
+
+ return builder.build();
+ }
+
+ private static Range<Token> range(int l, int r)
+ {
+ return new Range<>(new ByteOrderedPartitioner.BytesToken(Integer.toString(l).getBytes()),
+ new ByteOrderedPartitioner.BytesToken(Integer.toString(r).getBytes()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
index ec0f6b1..7eebef7 100644
--- a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
+++ b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import org.junit.Assert;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.OrderPreservingPartitioner.StringToken;
@@ -76,12 +77,12 @@ public class ViewUtilsTest
KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
Schema.instance.load(meta);
- Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
- new StringToken("CA"),
- new StringToken("BB"));
+ Optional<Replica> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+ new StringToken("CA"),
+ new StringToken("BB"));
Assert.assertTrue(naturalEndpoint.isPresent());
- Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.2"), naturalEndpoint.get());
+ Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.2"), naturalEndpoint.get().endpoint());
}
@@ -109,12 +110,12 @@ public class ViewUtilsTest
KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
Schema.instance.load(meta);
- Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
- new StringToken("CA"),
- new StringToken("BB"));
+ Optional<Replica> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+ new StringToken("CA"),
+ new StringToken("BB"));
Assert.assertTrue(naturalEndpoint.isPresent());
- Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.1"), naturalEndpoint.get());
+ Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.1"), naturalEndpoint.get().endpoint());
}
@Test
@@ -141,9 +142,9 @@ public class ViewUtilsTest
KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
Schema.instance.load(meta);
- Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
- new StringToken("AB"),
- new StringToken("BB"));
+ Optional<Replica> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+ new StringToken("AB"),
+ new StringToken("BB"));
Assert.assertFalse(naturalEndpoint.isPresent());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index f11cb62..8ae6853 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -18,18 +18,20 @@
package org.apache.cassandra.dht;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.net.UnknownHostException;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.UUID;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.dht.RangeStreamer.FetchReplica;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.junit.AfterClass;
@@ -41,6 +43,7 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.tokenallocator.TokenAllocation;
@@ -60,6 +63,7 @@ public class BootStrapperTest
{
static IPartitioner oldPartitioner;
+ static Predicate<Replica> originalAlivePredicate = RangeStreamer.ALIVE_PREDICATE;
@BeforeClass
public static void setup() throws ConfigurationException
{
@@ -68,12 +72,14 @@ public class BootStrapperTest
SchemaLoader.startGossiper();
SchemaLoader.prepareServer();
SchemaLoader.schemaDefinition("BootStrapperTest");
+ RangeStreamer.ALIVE_PREDICATE = Predicates.alwaysTrue();
}
@AfterClass
public static void tearDown()
{
DatabaseDescriptor.setPartitionerUnsafe(oldPartitioner);
+ RangeStreamer.ALIVE_PREDICATE = originalAlivePredicate;
}
@Test
@@ -82,7 +88,7 @@ public class BootStrapperTest
final int[] clusterSizes = new int[] { 1, 3, 5, 10, 100};
for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces())
{
- int replicationFactor = Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor();
+ int replicationFactor = Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor().allReplicas;
for (int clusterSize : clusterSizes)
if (clusterSize >= replicationFactor)
testSourceTargetComputation(keyspaceName, clusterSize, replicationFactor);
@@ -115,21 +121,25 @@ public class BootStrapperTest
public void forceConviction(InetAddressAndPort ep) { throw new UnsupportedOperationException(); }
};
s.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(mockFailureDetector));
+ assertNotNull(Keyspace.open(keyspaceName));
s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint));
- Collection<Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> toFetch = s.toFetch().get(keyspaceName);
- // Check we get get RF new ranges in total
- Set<Range<Token>> ranges = new HashSet<>();
- for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> e : toFetch)
- ranges.addAll(e.getValue());
+ Collection<Multimap<InetAddressAndPort, FetchReplica>> toFetch = s.toFetch().get(keyspaceName);
- assertEquals(replicationFactor, ranges.size());
+ // Check we get get RF new ranges in total
+ long rangesCount = toFetch.stream()
+ .map(Multimap::values)
+ .flatMap(Collection::stream)
+ .map(f -> f.remote)
+ .map(Replica::range)
+ .count();
+ assertEquals(replicationFactor, rangesCount);
// there isn't any point in testing the size of these collections for any specific size. When a random partitioner
// is used, they will vary.
- assert toFetch.iterator().next().getValue().size() > 0;
- assert !toFetch.iterator().next().getKey().equals(myEndpoint);
+ assert toFetch.stream().map(Multimap::values).flatMap(Collection::stream).count() > 0;
+ assert toFetch.stream().map(Multimap::keySet).map(Collection::stream).noneMatch(myEndpoint::equals);
return s;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
index 78e87c1..07d6377 100644
--- a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
@@ -25,8 +25,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-import com.google.common.collect.HashMultimap;
+import com.google.common.base.Predicate;
import com.google.common.collect.Multimap;
+import org.apache.cassandra.locator.EndpointsByRange;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -34,6 +35,8 @@ import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaUtils;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -78,14 +81,14 @@ public class RangeFetchMapCalculatorTest
@Test
public void testWithSingleSource() throws Exception
{
- Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+ EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.4");
addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.5");
- RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+ RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
@@ -95,14 +98,14 @@ public class RangeFetchMapCalculatorTest
@Test
public void testWithNonOverlappingSource() throws Exception
{
- Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+ EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.4");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.5", "127.0.0.6");
addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.7", "127.0.0.8");
addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.9", "127.0.0.10");
- RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+ RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
@@ -112,12 +115,12 @@ public class RangeFetchMapCalculatorTest
@Test
public void testWithRFThreeReplacement() throws Exception
{
- Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+ EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2", "127.0.0.3");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3", "127.0.0.4");
- RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+ RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
@@ -128,14 +131,14 @@ public class RangeFetchMapCalculatorTest
@Test
public void testForMultipleRoundsComputation() throws Exception
{
- Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+ EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.3");
addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.3", "127.0.0.2");
- RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+ RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
@@ -150,14 +153,14 @@ public class RangeFetchMapCalculatorTest
@Test
public void testForMultipleRoundsComputationWithLocalHost() throws Exception
{
- Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+ EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1", "127.0.0.2");
- RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+ RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
@@ -170,14 +173,14 @@ public class RangeFetchMapCalculatorTest
@Test
public void testForEmptyGraph() throws Exception
{
- Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+ EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1");
addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1");
- RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+ RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
//All ranges map to local host so we will not stream anything.
assertTrue(map.isEmpty());
@@ -186,31 +189,28 @@ public class RangeFetchMapCalculatorTest
@Test
public void testWithNoSourceWithLocal() throws Exception
{
- Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+ EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.5");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
//Return false for all except 127.0.0.5
- final RangeStreamer.ISourceFilter filter = new RangeStreamer.ISourceFilter()
+ final Predicate<Replica> filter = replica ->
{
- public boolean shouldInclude(InetAddressAndPort endpoint)
+ try
{
- try
- {
- if (endpoint.equals(InetAddressAndPort.getByName("127.0.0.5")))
- return false;
- else
- return true;
- }
- catch (UnknownHostException e)
- {
+ if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.5")))
+ return false;
+ else
return true;
- }
+ }
+ catch (UnknownHostException e)
+ {
+ return true;
}
};
- RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, Arrays.asList(filter), "Test");
+ RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Arrays.asList(filter), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
@@ -225,32 +225,26 @@ public class RangeFetchMapCalculatorTest
@Test (expected = IllegalStateException.class)
public void testWithNoLiveSource() throws Exception
{
- Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+ EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.5");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
- final RangeStreamer.ISourceFilter allDeadFilter = new RangeStreamer.ISourceFilter()
- {
- public boolean shouldInclude(InetAddressAndPort endpoint)
- {
- return false;
- }
- };
+ final Predicate<Replica> allDeadFilter = replica -> false;
- RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, Arrays.asList(allDeadFilter), "Test");
+ RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Arrays.asList(allDeadFilter), "Test");
calculator.getRangeFetchMap();
}
@Test
public void testForLocalDC() throws Exception
{
- Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+ EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.3", "127.0.0.53");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1", "127.0.0.3", "127.0.0.57");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59", "127.0.0.61");
- RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<>(), "Test");
+ RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), new ArrayList<>(), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
Assert.assertEquals(2, map.asMap().size());
@@ -263,31 +257,28 @@ public class RangeFetchMapCalculatorTest
@Test
public void testForRemoteDC() throws Exception
{
- Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+ EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59");
//Reject only 127.0.0.3 and accept everyone else
- final RangeStreamer.ISourceFilter localHostFilter = new RangeStreamer.ISourceFilter()
+ final Predicate<Replica> localHostFilter = replica ->
{
- public boolean shouldInclude(InetAddressAndPort endpoint)
+ try
{
- try
- {
- if (endpoint.equals(InetAddressAndPort.getByName("127.0.0.3")))
- return false;
- else
- return true;
- }
- catch (UnknownHostException e)
- {
+ if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3")))
+ return false;
+ else
return true;
- }
+ }
+ catch (UnknownHostException e)
+ {
+ return true;
}
};
- RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, Arrays.asList(localHostFilter), "Test");
+ RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Arrays.asList(localHostFilter), "Test");
Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap();
validateRange(rangesWithSources, map);
Assert.assertEquals(3, map.asMap().size());
@@ -301,14 +292,14 @@ public class RangeFetchMapCalculatorTest
@Test
public void testTrivialRanges() throws UnknownHostException
{
- Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+ EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
// add non-trivial ranges
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55");
addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59");
// and a trivial one:
addTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
- RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, Collections.emptyList(), "Test");
+ RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test");
Multimap<InetAddressAndPort, Range<Token>> optMap = calculator.getRangeFetchMapForNonTrivialRanges();
Multimap<InetAddressAndPort, Range<Token>> trivialMap = calculator.getRangeFetchMapForTrivialRanges(optMap);
assertTrue(trivialMap.get(InetAddressAndPort.getByName("127.0.0.3")).contains(generateTrivialRange(1,10)) ^
@@ -319,7 +310,7 @@ public class RangeFetchMapCalculatorTest
@Test(expected = IllegalStateException.class)
public void testNotEnoughEndpointsForTrivialRange() throws UnknownHostException
{
- Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create();
+ EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable();
// add non-trivial ranges
addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55");
@@ -327,23 +318,20 @@ public class RangeFetchMapCalculatorTest
// and a trivial one:
addTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3");
- RangeStreamer.ISourceFilter filter = new RangeStreamer.ISourceFilter()
+ Predicate<Replica> filter = replica ->
{
- public boolean shouldInclude(InetAddressAndPort endpoint)
+ try
{
- try
- {
- if (endpoint.equals(InetAddressAndPort.getByName("127.0.0.3")))
- return false;
- }
- catch (UnknownHostException e)
- {
- throw new RuntimeException(e);
- }
- return true;
+ if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3")))
+ return false;
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
}
+ return true;
};
- RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, Collections.singleton(filter), "Test");
+ RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.singleton(filter), "Test");
Multimap<InetAddressAndPort, Range<Token>> optMap = calculator.getRangeFetchMapForNonTrivialRanges();
Multimap<InetAddressAndPort, Range<Token>> trivialMap = calculator.getRangeFetchMapForTrivialRanges(optMap);
@@ -355,27 +343,29 @@ public class RangeFetchMapCalculatorTest
assertTrue(result.containsAll(expected));
}
- private void validateRange(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> result)
+ private void validateRange(EndpointsByRange.Mutable rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> result)
{
for (Map.Entry<InetAddressAndPort, Range<Token>> entry : result.entries())
{
- assertTrue(rangesWithSources.get(entry.getValue()).contains(entry.getKey()));
+ assertTrue(rangesWithSources.get(entry.getValue()).endpoints().contains(entry.getKey()));
}
}
- private void addNonTrivialRangeAndSources(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, int left, int right, String... hosts) throws UnknownHostException
+ private void addNonTrivialRangeAndSources(EndpointsByRange.Mutable rangesWithSources, int left, int right, String... hosts) throws UnknownHostException
{
for (InetAddressAndPort endpoint : makeAddrs(hosts))
{
- rangesWithSources.put(generateNonTrivialRange(left, right), endpoint);
+ Range<Token> range = generateNonTrivialRange(left, right);
+ rangesWithSources.put(range, Replica.fullReplica(endpoint, range));
}
}
- private void addTrivialRangeAndSources(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, int left, int right, String... hosts) throws UnknownHostException
+ private void addTrivialRangeAndSources(EndpointsByRange.Mutable rangesWithSources, int left, int right, String... hosts) throws UnknownHostException
{
for (InetAddressAndPort endpoint : makeAddrs(hosts))
{
- rangesWithSources.put(generateTrivialRange(left, right), endpoint);
+ Range<Token> range = generateTrivialRange(left, right);
+ rangesWithSources.put(range, Replica.fullReplica(endpoint, range));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/dht/RangeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/RangeTest.java b/test/unit/org/apache/cassandra/dht/RangeTest.java
index 495979e..36a8da1 100644
--- a/test/unit/org/apache/cassandra/dht/RangeTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeTest.java
@@ -642,7 +642,7 @@ public class RangeTest
Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges);
for (Token t : tokensToTest)
{
- if (checker.contains(t) != Range.isInRanges(t, ranges)) // avoid running Joiner.on(..) every iteration
+ if (checker.test(t) != Range.isInRanges(t, ranges)) // avoid running Joiner.on(..) every iteration
fail(String.format("This should never flap! If it does, it is a bug (ranges = %s, token = %s)", Joiner.on(",").join(ranges), t));
}
}
@@ -653,11 +653,11 @@ public class RangeTest
{
List<Range<Token>> ranges = asList(r(Long.MIN_VALUE, Long.MIN_VALUE + 1), r(Long.MAX_VALUE - 1, Long.MAX_VALUE));
Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges);
- assertFalse(checker.contains(t(Long.MIN_VALUE)));
- assertTrue(checker.contains(t(Long.MIN_VALUE + 1)));
- assertFalse(checker.contains(t(0)));
- assertFalse(checker.contains(t(Long.MAX_VALUE - 1)));
- assertTrue(checker.contains(t(Long.MAX_VALUE)));
+ assertFalse(checker.test(t(Long.MIN_VALUE)));
+ assertTrue(checker.test(t(Long.MIN_VALUE + 1)));
+ assertFalse(checker.test(t(0)));
+ assertFalse(checker.test(t(Long.MAX_VALUE - 1)));
+ assertTrue(checker.test(t(Long.MAX_VALUE)));
}
private static Range<Token> r(long left, long right)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/dht/SplitterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/SplitterTest.java b/test/unit/org/apache/cassandra/dht/SplitterTest.java
index 322a57c..c591499 100644
--- a/test/unit/org/apache/cassandra/dht/SplitterTest.java
+++ b/test/unit/org/apache/cassandra/dht/SplitterTest.java
@@ -62,13 +62,54 @@ public class SplitterTest
randomSplitTestVNodes(new Murmur3Partitioner());
}
+ @Test
+ public void testWithWeight()
+ {
+ List<Splitter.WeightedRange> ranges = new ArrayList<>();
+ ranges.add(new Splitter.WeightedRange(1.0, t(0, 10)));
+ ranges.add(new Splitter.WeightedRange(1.0, t(20, 30)));
+ ranges.add(new Splitter.WeightedRange(0.5, t(40, 60)));
+
+ List<Splitter.WeightedRange> ranges2 = new ArrayList<>();
+ ranges2.add(new Splitter.WeightedRange(1.0, t(0, 10)));
+ ranges2.add(new Splitter.WeightedRange(1.0, t(20, 30)));
+ ranges2.add(new Splitter.WeightedRange(1.0, t(40, 50)));
+ IPartitioner partitioner = Murmur3Partitioner.instance;
+ Splitter splitter = partitioner.splitter().get();
+
+ assertEquals(splitter.splitOwnedRanges(2, ranges, false), splitter.splitOwnedRanges(2, ranges2, false));
+ }
+
+ @Test
+ public void testWithWeight2()
+ {
+ List<Splitter.WeightedRange> ranges = new ArrayList<>();
+ ranges.add(new Splitter.WeightedRange(0.2, t(0, 10)));
+ ranges.add(new Splitter.WeightedRange(1.0, t(20, 30)));
+ ranges.add(new Splitter.WeightedRange(1.0, t(40, 50)));
+
+ List<Splitter.WeightedRange> ranges2 = new ArrayList<>();
+ ranges2.add(new Splitter.WeightedRange(1.0, t(0, 2)));
+ ranges2.add(new Splitter.WeightedRange(1.0, t(20, 30)));
+ ranges2.add(new Splitter.WeightedRange(1.0, t(40, 50)));
+ IPartitioner partitioner = Murmur3Partitioner.instance;
+ Splitter splitter = partitioner.splitter().get();
+
+ assertEquals(splitter.splitOwnedRanges(2, ranges, false), splitter.splitOwnedRanges(2, ranges2, false));
+ }
+
+ private Range<Token> t(long left, long right)
+ {
+ return new Range<>(new Murmur3Partitioner.LongToken(left), new Murmur3Partitioner.LongToken(right));
+ }
+
private static void randomSplitTestNoVNodes(IPartitioner partitioner)
{
Splitter splitter = getSplitter(partitioner);
Random r = new Random();
for (int i = 0; i < 10000; i++)
{
- List<Range<Token>> localRanges = generateLocalRanges(1, r.nextInt(4) + 1, splitter, r, partitioner instanceof RandomPartitioner);
+ List<Splitter.WeightedRange> localRanges = generateLocalRanges(1, r.nextInt(4) + 1, splitter, r, partitioner instanceof RandomPartitioner);
List<Token> boundaries = splitter.splitOwnedRanges(r.nextInt(9) + 1, localRanges, false);
assertTrue("boundaries = " + boundaries + " ranges = " + localRanges, assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, true));
}
@@ -84,14 +125,14 @@ public class SplitterTest
int numTokens = 172 + r.nextInt(128);
int rf = r.nextInt(4) + 2;
int parts = r.nextInt(5) + 1;
- List<Range<Token>> localRanges = generateLocalRanges(numTokens, rf, splitter, r, partitioner instanceof RandomPartitioner);
+ List<Splitter.WeightedRange> localRanges = generateLocalRanges(numTokens, rf, splitter, r, partitioner instanceof RandomPartitioner);
List<Token> boundaries = splitter.splitOwnedRanges(parts, localRanges, true);
if (!assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, false))
fail(String.format("Could not split %d tokens with rf=%d into %d parts (localRanges=%s, boundaries=%s)", numTokens, rf, parts, localRanges, boundaries));
}
}
- private static boolean assertRangeSizeEqual(List<Range<Token>> localRanges, List<Token> tokens, IPartitioner partitioner, Splitter splitter, boolean splitIndividualRanges)
+ private static boolean assertRangeSizeEqual(List<Splitter.WeightedRange> localRanges, List<Token> tokens, IPartitioner partitioner, Splitter splitter, boolean splitIndividualRanges)
{
Token start = partitioner.getMinimumToken();
List<BigInteger> splits = new ArrayList<>();
@@ -119,27 +160,27 @@ public class SplitterTest
return allBalanced;
}
- private static BigInteger sumOwnedBetween(List<Range<Token>> localRanges, Token start, Token end, Splitter splitter, boolean splitIndividualRanges)
+ private static BigInteger sumOwnedBetween(List<Splitter.WeightedRange> localRanges, Token start, Token end, Splitter splitter, boolean splitIndividualRanges)
{
BigInteger sum = BigInteger.ZERO;
- for (Range<Token> range : localRanges)
+ for (Splitter.WeightedRange range : localRanges)
{
if (splitIndividualRanges)
{
- Set<Range<Token>> intersections = new Range<>(start, end).intersectionWith(range);
+ Set<Range<Token>> intersections = new Range<>(start, end).intersectionWith(range.range());
for (Range<Token> intersection : intersections)
sum = sum.add(splitter.valueForToken(intersection.right).subtract(splitter.valueForToken(intersection.left)));
}
else
{
- if (new Range<>(start, end).contains(range.left))
- sum = sum.add(splitter.valueForToken(range.right).subtract(splitter.valueForToken(range.left)));
+ if (new Range<>(start, end).contains(range.left()))
+ sum = sum.add(splitter.valueForToken(range.right()).subtract(splitter.valueForToken(range.left())));
}
}
return sum;
}
- private static List<Range<Token>> generateLocalRanges(int numTokens, int rf, Splitter splitter, Random r, boolean randomPartitioner)
+ private static List<Splitter.WeightedRange> generateLocalRanges(int numTokens, int rf, Splitter splitter, Random r, boolean randomPartitioner)
{
int localTokens = numTokens * rf;
List<Token> randomTokens = new ArrayList<>();
@@ -152,11 +193,11 @@ public class SplitterTest
Collections.sort(randomTokens);
- List<Range<Token>> localRanges = new ArrayList<>(localTokens);
+ List<Splitter.WeightedRange> localRanges = new ArrayList<>(localTokens);
for (int i = 0; i < randomTokens.size() - 1; i++)
{
assert randomTokens.get(i).compareTo(randomTokens.get(i + 1)) < 0;
- localRanges.add(new Range<>(randomTokens.get(i), randomTokens.get(i + 1)));
+ localRanges.add(new Splitter.WeightedRange(1.0, new Range<>(randomTokens.get(i), randomTokens.get(i + 1))));
i++;
}
return localRanges;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
index bf71c09..34096a7 100644
--- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
+++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.dht;
import java.util.Collections;
+import org.apache.cassandra.locator.RangesAtEndpoint;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -53,7 +54,7 @@ public class StreamStateStoreTest
InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
- session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf"));
+ session.addStreamRequest("keyspace1", RangesAtEndpoint.toDummyList(Collections.singleton(range)), RangesAtEndpoint.toDummyList(Collections.emptyList()), Collections.singleton("cf"));
StreamStateStore store = new StreamStateStore();
// session complete event that is not completed makes data not available for keyspace/ranges
@@ -74,7 +75,7 @@ public class StreamStateStoreTest
// add different range within the same keyspace
Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200"));
session = new StreamSession(StreamOperation.BOOTSTRAP, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE);
- session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf"));
+ session.addStreamRequest("keyspace1", RangesAtEndpoint.toDummyList(Collections.singleton(range2)), RangesAtEndpoint.toDummyList(Collections.emptyList()), Collections.singleton("cf"));
session.state(StreamSession.State.COMPLETE);
store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
index 833ee8b..0710945 100644
--- a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
+++ b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
@@ -62,7 +62,7 @@ public class PendingRangeCalculatorServiceTest
@BMRule(name = "Block pending range calculation",
targetClass = "TokenMetadata",
targetMethod = "calculatePendingRanges",
- targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressRanges",
+ targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressReplicas",
action = "org.apache.cassandra.gms.PendingRangeCalculatorServiceTest.calculationLock.lock()")
public void testDelayedResponse() throws UnknownHostException, InterruptedException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index fccb344..9e3594b 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -69,7 +69,7 @@ public class BigTableWriterTest extends AbstractTransactionalTest
private TestableBTW(Descriptor desc)
{
- this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null,
+ this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null, false,
new SerializationHeader(true, cfs.metadata(),
cfs.metadata().regularAndStaticColumns(),
EncodingStats.NO_STATS)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index dbb929d..faf46bc 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -649,7 +649,7 @@ public class CQLSSTableWriterTest
public void init(String keyspace)
{
this.keyspace = keyspace;
- for (Range<Token> range : StorageService.instance.getLocalRanges(ks))
+ for (Range<Token> range : StorageService.instance.getLocalReplicas(ks).ranges())
addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 044cd9f..c4ccf48 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -176,21 +176,26 @@ public class LegacySSTableTest
{
for (SSTableReader sstable : cfs.getLiveSSTables())
{
- sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1234, NO_PENDING_REPAIR);
+ sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 1234, NO_PENDING_REPAIR, false);
sstable.reloadSSTableMetadata();
assertEquals(1234, sstable.getRepairedAt());
if (sstable.descriptor.version.hasPendingRepair())
assertEquals(NO_PENDING_REPAIR, sstable.getPendingRepair());
}
+ boolean isTransient = false;
for (SSTableReader sstable : cfs.getLiveSSTables())
{
UUID random = UUID.randomUUID();
- sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, UNREPAIRED_SSTABLE, random);
+ sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, UNREPAIRED_SSTABLE, random, isTransient);
sstable.reloadSSTableMetadata();
assertEquals(UNREPAIRED_SSTABLE, sstable.getRepairedAt());
if (sstable.descriptor.version.hasPendingRepair())
assertEquals(random, sstable.getPendingRepair());
+ if (sstable.descriptor.version.hasIsTransient())
+ assertEquals(isTransient, sstable.isTransient());
+
+ isTransient = !isTransient;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index 8509115..5d40f8c 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -31,14 +31,13 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.KeyspaceParams;
@@ -109,8 +108,8 @@ public class SSTableLoaderTest
public void init(String keyspace)
{
this.keyspace = keyspace;
- for (Range<Token> range : StorageService.instance.getLocalRanges(KEYSPACE1))
- addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort());
+ for (Replica replica : StorageService.instance.getLocalReplicas(KEYSPACE1))
+ addRangeForEndpoint(replica.range(), FBUtilities.getBroadcastAddressAndPort());
}
public TableMetadataRef getTableMetadata(String tableName)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 6412ef4..7c47c8b 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -877,7 +877,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
File dir = cfs.getDirectories().getDirectoryForNewSSTables();
Descriptor desc = cfs.newSSTableDescriptor(dir);
- try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, null, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
+ try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, null, false, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS)))
{
int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) / fileCount;
for ( ; i < end ; i++)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 441a3b9..731cee2 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -219,7 +219,7 @@ public class SSTableUtils
TableMetadata metadata = Schema.instance.getTableMetadata(ksname, cfname);
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.id);
SerializationHeader header = appender.header();
- SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, Descriptor.fromFilename(datafile.getAbsolutePath()), expectedSize, UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, 0, header);
+ SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, Descriptor.fromFilename(datafile.getAbsolutePath()), expectedSize, UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, false, 0, header);
while (appender.append(writer)) { /* pass */ }
Collection<SSTableReader> readers = writer.finish(true);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
index 5d62cdb..31d0b89 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.sstable;
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.UUID;
import org.junit.Test;
@@ -32,9 +33,12 @@ import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.FBUtilities;
import static junit.framework.Assert.fail;
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -245,4 +249,60 @@ public class SSTableWriterTest extends SSTableWriterTestBase
}
}
+ private static void assertValidRepairMetadata(long repairedAt, UUID pendingRepair, boolean isTransient)
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_SMALL_MAX_VALUE);
+ File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+ LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+
+ try (SSTableWriter writer = getWriter(cfs, dir, txn, repairedAt, pendingRepair, isTransient))
+ {
+ // expected
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new AssertionError("Unexpected IllegalArgumentException", e);
+ }
+
+ txn.abort();
+ LifecycleTransaction.waitForDeletions();
+ }
+
+ private static void assertInvalidRepairMetadata(long repairedAt, UUID pendingRepair, boolean isTransient)
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_SMALL_MAX_VALUE);
+ File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+ LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+
+ try (SSTableWriter writer = getWriter(cfs, dir, txn, repairedAt, pendingRepair, isTransient))
+ {
+ fail("Expected IllegalArgumentException");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // expected
+ }
+
+ txn.abort();
+ LifecycleTransaction.waitForDeletions();
+ }
+
+ /**
+ * It should only be possible to create sstables marked transient that also have a pending repair
+ */
+ @Test
+ public void testRepairMetadataValidation()
+ {
+ assertValidRepairMetadata(UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, false);
+ assertValidRepairMetadata(1, NO_PENDING_REPAIR, false);
+ assertValidRepairMetadata(UNREPAIRED_SSTABLE, UUID.randomUUID(), false);
+ assertValidRepairMetadata(UNREPAIRED_SSTABLE, UUID.randomUUID(), true);
+
+ assertInvalidRepairMetadata(UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, true);
+ assertInvalidRepairMetadata(1, UUID.randomUUID(), false);
+ assertInvalidRepairMetadata(1, NO_PENDING_REPAIR, true);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
index d42c49b..962e1a1 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -48,6 +49,7 @@ import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class SSTableWriterTestBase extends SchemaLoader
{
@@ -161,10 +163,15 @@ public class SSTableWriterTestBase extends SchemaLoader
assertFalse(CompactionManager.instance.submitMaximal(cfs, cfs.gcBefore((int) (System.currentTimeMillis() / 1000)), false).isEmpty());
}
- public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
+ public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn, long repairedAt, UUID pendingRepair, boolean isTransient)
{
Descriptor desc = cfs.newSSTableDescriptor(directory);
- return SSTableWriter.create(desc, 0, 0, null, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS), cfs.indexManager.listIndexes(), txn);
+ return SSTableWriter.create(desc, 0, repairedAt, pendingRepair, isTransient, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS), cfs.indexManager.listIndexes(), txn);
+ }
+
+ public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
+ {
+ return getWriter(cfs, directory, txn, 0, null, false);
}
public static ByteBuffer random(int i, int size)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
index 9aa4e28..aea3b4a 100644
--- a/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
@@ -96,7 +96,7 @@ public class SSTableFlushObserverTest
KS_NAME, CF_NAME,
0,
sstableFormat),
- 10L, 0L, null, TableMetadataRef.forOfflineTools(cfm),
+ 10L, 0L, null, false, TableMetadataRef.forOfflineTools(cfm),
new MetadataCollector(cfm.comparator).sstableLevel(0),
new SerializationHeader(true, cfm, cfm.regularAndStaticColumns(), EncodingStats.NO_STATS),
Collections.singletonList(observer),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index 8ab1511..f109d8f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -99,7 +99,7 @@ public class MetadataSerializerTest
String partitioner = RandomPartitioner.class.getCanonicalName();
double bfFpChance = 0.1;
- return collector.finalizeMetadata(partitioner, bfFpChance, 0, null, SerializationHeader.make(cfm, Collections.emptyList()));
+ return collector.finalizeMetadata(partitioner, bfFpChance, 0, null, false, SerializationHeader.make(cfm, Collections.emptyList()));
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
index 202d7f1..bf1d940 100644
--- a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java
@@ -50,6 +50,16 @@ public class DynamicEndpointSnitchTest
Thread.sleep(150);
}
+ private static EndpointsForRange full(InetAddressAndPort... endpoints)
+ {
+ EndpointsForRange.Builder rlist = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE, endpoints.length);
+ for (InetAddressAndPort endpoint: endpoints)
+ {
+ rlist.add(ReplicaUtils.full(endpoint));
+ }
+ return rlist.build();
+ }
+
@Test
public void testSnitch() throws InterruptedException, IOException, ConfigurationException
{
@@ -66,41 +76,41 @@ public class DynamicEndpointSnitchTest
// first, make all hosts equal
setScores(dsnitch, 1, hosts, 10, 10, 10);
- List<InetAddressAndPort> order = Arrays.asList(host1, host2, host3);
- assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3)));
+ EndpointsForRange order = full(host1, host2, host3);
+ assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3)));
// make host1 a little worse
setScores(dsnitch, 1, hosts, 20, 10, 10);
- order = Arrays.asList(host2, host3, host1);
- assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3)));
+ order = full(host2, host3, host1);
+ assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3)));
// make host2 as bad as host1
setScores(dsnitch, 2, hosts, 15, 20, 10);
- order = Arrays.asList(host3, host1, host2);
- assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3)));
+ order = full(host3, host1, host2);
+ assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3)));
// make host3 the worst
setScores(dsnitch, 3, hosts, 10, 10, 30);
- order = Arrays.asList(host1, host2, host3);
- assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3)));
+ order = full(host1, host2, host3);
+ assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3)));
// make host3 equal to the others
setScores(dsnitch, 5, hosts, 10, 10, 10);
- order = Arrays.asList(host1, host2, host3);
- assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3)));
+ order = full(host1, host2, host3);
+ assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3)));
/// Tests CASSANDRA-6683 improvements
// make the scores differ enough from the ideal order that we sort by score; under the old
// dynamic snitch behavior (where we only compared neighbors), these wouldn't get sorted
setScores(dsnitch, 20, hosts, 10, 70, 20);
- order = Arrays.asList(host1, host3, host2);
- assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3)));
+ order = full(host1, host3, host2);
+ assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3)));
- order = Arrays.asList(host4, host1, host3, host2);
- assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3, host4)));
+ order = full(host4, host1, host3, host2);
+ assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3, host4)));
setScores(dsnitch, 20, hosts, 10, 10, 10);
- order = Arrays.asList(host4, host1, host2, host3);
- assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3, host4)));
+ order = full(host4, host1, host2, host3);
+ assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3, host4)));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
index ab6c6cd..5f6e26f 100644
--- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
@@ -25,6 +25,7 @@ import java.util.stream.Collectors;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.junit.Assert;
@@ -36,12 +37,17 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
import org.apache.cassandra.dht.OrderPreservingPartitioner.StringToken;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.TokenMetadata.Topology;
import org.apache.cassandra.service.StorageService;
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+
public class NetworkTopologyStrategyTest
{
private String keyspaceName = "Keyspace1";
@@ -51,6 +57,7 @@ public class NetworkTopologyStrategyTest
public static void setupDD()
{
DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
}
@Test
@@ -68,13 +75,14 @@ public class NetworkTopologyStrategyTest
// Set the localhost to the tokenmetadata. Embedded cassandra way?
NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
- assert strategy.getReplicationFactor("DC1") == 3;
- assert strategy.getReplicationFactor("DC2") == 2;
- assert strategy.getReplicationFactor("DC3") == 1;
+ assert strategy.getReplicationFactor("DC1").allReplicas == 3;
+ assert strategy.getReplicationFactor("DC2").allReplicas == 2;
+ assert strategy.getReplicationFactor("DC3").allReplicas == 1;
// Query for the natural hosts
- ArrayList<InetAddressAndPort> endpoints = strategy.getNaturalEndpoints(new StringToken("123"));
- assert 6 == endpoints.size();
- assert 6 == new HashSet<>(endpoints).size(); // ensure uniqueness
+ EndpointsForToken replicas = strategy.getNaturalReplicasForToken(new StringToken("123"));
+ assert 6 == replicas.size();
+ assert 6 == replicas.endpoints().size(); // ensure uniqueness
+ assert 6 == new HashSet<>(replicas.byEndpoint().values()).size(); // ensure uniqueness
}
@Test
@@ -92,13 +100,14 @@ public class NetworkTopologyStrategyTest
// Set the localhost to the tokenmetadata. Embedded cassandra way?
NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
- assert strategy.getReplicationFactor("DC1") == 3;
- assert strategy.getReplicationFactor("DC2") == 3;
- assert strategy.getReplicationFactor("DC3") == 0;
+ assert strategy.getReplicationFactor("DC1").allReplicas == 3;
+ assert strategy.getReplicationFactor("DC2").allReplicas == 3;
+ assert strategy.getReplicationFactor("DC3").allReplicas == 0;
// Query for the natural hosts
- ArrayList<InetAddressAndPort> endpoints = strategy.getNaturalEndpoints(new StringToken("123"));
- assert 6 == endpoints.size();
- assert 6 == new HashSet<>(endpoints).size(); // ensure uniqueness
+ EndpointsForToken replicas = strategy.getNaturalReplicasForToken(new StringToken("123"));
+ assert 6 == replicas.size();
+ assert 6 == replicas.endpoints().size(); // ensure uniqueness
+ assert 6 == new HashSet<>(replicas.byEndpoint().values()).size(); // ensure uniqueness
}
@Test
@@ -137,12 +146,13 @@ public class NetworkTopologyStrategyTest
for (String testToken : new String[]{"123456", "200000", "000402", "ffffff", "400200"})
{
- List<InetAddressAndPort> endpoints = strategy.calculateNaturalEndpoints(new StringToken(testToken), metadata);
- Set<InetAddressAndPort> epSet = new HashSet<>(endpoints);
+ EndpointsForRange replicas = strategy.calculateNaturalReplicas(new StringToken(testToken), metadata);
+ Set<InetAddressAndPort> endpointSet = replicas.endpoints();
- Assert.assertEquals(totalRF, endpoints.size());
- Assert.assertEquals(totalRF, epSet.size());
- logger.debug("{}: {}", testToken, endpoints);
+ Assert.assertEquals(totalRF, replicas.size());
+ Assert.assertEquals(totalRF, new HashSet<>(replicas.byEndpoint().values()).size());
+ Assert.assertEquals(totalRF, endpointSet.size());
+ logger.debug("{}: {}", testToken, replicas);
}
}
@@ -209,7 +219,7 @@ public class NetworkTopologyStrategyTest
{
Token token = Murmur3Partitioner.instance.getRandomToken(rand);
List<InetAddressAndPort> expected = calculateNaturalEndpoints(token, tokenMetadata, datacenters, snitch);
- List<InetAddressAndPort> actual = nts.calculateNaturalEndpoints(token, tokenMetadata);
+ List<InetAddressAndPort> actual = new ArrayList<>(nts.calculateNaturalReplicas(token, tokenMetadata).endpoints());
if (endpointsDiffer(expected, actual))
{
System.err.println("Endpoints mismatch for token " + token);
@@ -373,4 +383,50 @@ public class NetworkTopologyStrategyTest
Integer replicas = datacenters.get(dc);
return replicas == null ? 0 : replicas;
}
+
+ private static Token tk(long t)
+ {
+ return new LongToken(t);
+ }
+
+ private static Range<Token> range(long l, long r)
+ {
+ return new Range<>(tk(l), tk(r));
+ }
+
+ @Test
+ public void testTransientReplica() throws Exception
+ {
+ IEndpointSnitch snitch = new SimpleSnitch();
+ DatabaseDescriptor.setEndpointSnitch(snitch);
+
+ List<InetAddressAndPort> endpoints = Lists.newArrayList(InetAddressAndPort.getByName("127.0.0.1"),
+ InetAddressAndPort.getByName("127.0.0.2"),
+ InetAddressAndPort.getByName("127.0.0.3"),
+ InetAddressAndPort.getByName("127.0.0.4"));
+
+ Multimap<InetAddressAndPort, Token> tokens = HashMultimap.create();
+ tokens.put(endpoints.get(0), tk(100));
+ tokens.put(endpoints.get(1), tk(200));
+ tokens.put(endpoints.get(2), tk(300));
+ tokens.put(endpoints.get(3), tk(400));
+ TokenMetadata metadata = new TokenMetadata();
+ metadata.updateNormalTokens(tokens);
+
+ Map<String, String> configOptions = new HashMap<String, String>();
+ configOptions.put(snitch.getDatacenter((InetAddressAndPort) null), "3/1");
+
+ NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
+
+ Assert.assertEquals(EndpointsForRange.of(fullReplica(endpoints.get(0), range(400, 100)),
+ fullReplica(endpoints.get(1), range(400, 100)),
+ transientReplica(endpoints.get(2), range(400, 100))),
+ strategy.getNaturalReplicasForToken(tk(99)));
+
+
+ Assert.assertEquals(EndpointsForRange.of(fullReplica(endpoints.get(1), range(100, 200)),
+ fullReplica(endpoints.get(2), range(100, 200)),
+ transientReplica(endpoints.get(3), range(100, 200))),
+ strategy.getNaturalReplicasForToken(tk(101)));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org