You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2017/03/16 22:42:09 UTC
cassandra git commit: Incremental repair not streaming correct
sstables
Repository: cassandra
Updated Branches:
refs/heads/trunk 28b838ed6 -> d6a701ea1
Incremental repair not streaming correct sstables
Patch by Blake Eggleston; reviewed by Marcus Eriksson for CASSANDRA-13328
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d6a701ea
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d6a701ea
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d6a701ea
Branch: refs/heads/trunk
Commit: d6a701ea11c919938cb09b0fca2ea0ec7ad2123b
Parents: 28b838e
Author: Blake Eggleston <bd...@gmail.com>
Authored: Tue Mar 14 15:37:57 2017 -0700
Committer: Blake Eggleston <bd...@gmail.com>
Committed: Thu Mar 16 15:41:54 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamSession.java | 21 ++-
.../io/sstable/SSTableRewriterTest.java | 4 +-
.../cassandra/streaming/StreamSessionTest.java | 132 +++++++++++++++++++
4 files changed, 151 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6a701ea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a162aeb..4f856e2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Incremental repair not streaming correct sstables (CASSANDRA-13328)
* Upgrade the jna version to 4.3.0 (CASSANDRA-13300)
* Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132)
* Remove config option index_interval (CASSANDRA-10671)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6a701ea/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index b7db2b2..7ee99db 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -26,6 +26,8 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
import com.google.common.collect.*;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
@@ -320,7 +322,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
flushSSTables(stores);
List<Range<Token>> normalizedRanges = Range.normalize(ranges);
- List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt, isIncremental);
+ List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt, pendingRepair);
try
{
addTransferFiles(sections);
@@ -362,7 +364,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
}
@VisibleForTesting
- public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, final boolean isIncremental)
+ public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, UUID pendingRepair)
{
Refs<SSTableReader> refs = new Refs<>();
try
@@ -375,6 +377,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
refs.addAll(cfStore.selectAndReference(view -> {
Set<SSTableReader> sstables = Sets.newHashSet();
SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL));
+ Predicate<SSTableReader> predicate;
+ if (pendingRepair == ActiveRepairService.NO_PENDING_REPAIR)
+ {
+ predicate = Predicates.alwaysTrue();
+ }
+ else
+ {
+ predicate = s -> s.isPendingRepair() && s.getSSTableMetadata().pendingRepair.equals(pendingRepair);
+ }
+
for (Range<PartitionPosition> keyRange : keyRanges)
{
// keyRange excludes its start, while sstableInBounds is inclusive (of both start and end).
@@ -383,10 +395,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
// sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even
// including keyRange.left will still exclude any key having the token of the original token range, and so we're
// still actually selecting what we wanted.
- for (SSTableReader sstable : View.sstablesInBounds(keyRange.left, keyRange.right, intervalTree))
+ for (SSTableReader sstable : Iterables.filter(View.sstablesInBounds(keyRange.left, keyRange.right, intervalTree), predicate))
{
- if (!isIncremental || !sstable.isRepaired())
- sstables.add(sstable);
+ sstables.add(sstable);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6a701ea/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 41a6828..b8595af 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -789,7 +789,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = StreamSession.getSSTableSectionsForRanges(
Collections.singleton(new Range<Token>(firstToken, firstToken)),
- Collections.singleton(cfs), 0L, false);
+ Collections.singleton(cfs), 0L, null);
assertEquals(1, sectionsBeforeRewrite.size());
for (StreamSession.SSTableStreamingSections section : sectionsBeforeRewrite)
section.ref.release();
@@ -804,7 +804,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
while (!done.get())
{
Set<Range<Token>> range = Collections.singleton(new Range<Token>(firstToken, firstToken));
- List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), 0L, false);
+ List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), 0L, null);
if (sections.size() != 1)
failed.set(true);
for (StreamSession.SSTableStreamingSections section : sections)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6a701ea/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
new file mode 100644
index 0000000..8d388ab
--- /dev/null
+++ b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class StreamSessionTest
+{
+ private String keyspace = null;
+ private static final String table = "tbl";
+
+ private TableMetadata tbm;
+ private ColumnFamilyStore cfs;
+
+ @BeforeClass
+ public static void setupClass() throws Exception
+ {
+ SchemaLoader.prepareServer();
+ }
+
+ @Before
+ public void createKeyspace() throws Exception
+ {
+ keyspace = String.format("ks_%s", System.currentTimeMillis());
+ tbm = CreateTableStatement.parse(String.format("CREATE TABLE %s (k INT PRIMARY KEY, v INT)", table), keyspace).build();
+ SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), tbm);
+ cfs = Schema.instance.getColumnFamilyStoreInstance(tbm.id);
+ }
+
+ private SSTableReader createSSTable(Runnable queryable)
+ {
+ Set<SSTableReader> before = cfs.getLiveSSTables();
+ queryable.run();
+ cfs.forceBlockingFlush();
+ Set<SSTableReader> after = cfs.getLiveSSTables();
+
+ Set<SSTableReader> diff = Sets.difference(after, before);
+ assert diff.size() == 1 : "Expected 1 new sstable, got " + diff.size();
+ return diff.iterator().next();
+ }
+
+ private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException
+ {
+ Descriptor descriptor = sstable.descriptor;
+ descriptor.getMetadataSerializer().mutateRepaired(descriptor, repairedAt, pendingRepair);
+ sstable.reloadSSTableMetadata();
+
+ }
+
+ private Set<SSTableReader> selectReaders(UUID pendingRepair)
+ {
+ IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+ Collection<Range<Token>> ranges = Lists.newArrayList(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
+ List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(ranges,
+ Lists.newArrayList(cfs),
+ ActiveRepairService.UNREPAIRED_SSTABLE,
+ pendingRepair);
+ Set<SSTableReader> sstables = new HashSet<>();
+ for (StreamSession.SSTableStreamingSections section: sections)
+ {
+ sstables.add(section.ref.get());
+ }
+ return sstables;
+ }
+
+ @Test
+ public void incrementalSSTableSelection() throws Exception
+ {
+ // make 3 tables, 1 unrepaired, 2 pending repair with different repair ids, and 1 repaired
+ SSTableReader sstable1 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (1, 1)", keyspace, table)));
+ SSTableReader sstable2 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (2, 2)", keyspace, table)));
+ SSTableReader sstable3 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (3, 3)", keyspace, table)));
+ SSTableReader sstable4 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (4, 4)", keyspace, table)));
+
+
+ UUID pendingRepair = UUIDGen.getTimeUUID();
+ long repairedAt = System.currentTimeMillis();
+ mutateRepaired(sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair);
+ mutateRepaired(sstable3, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
+ mutateRepaired(sstable4, repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
+
+ // no pending repair should return all sstables
+ Assert.assertEquals(Sets.newHashSet(sstable1, sstable2, sstable3, sstable4), selectReaders(ActiveRepairService.NO_PENDING_REPAIR));
+
+ // a pending repair arg should only return sstables with the same pending repair id
+ Assert.assertEquals(Sets.newHashSet(sstable2), selectReaders(pendingRepair));
+ }
+}