You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2017/03/16 22:42:09 UTC

cassandra git commit: Incremental repair not streaming correct sstables

Repository: cassandra
Updated Branches:
  refs/heads/trunk 28b838ed6 -> d6a701ea1


Incremental repair not streaming correct sstables

Patch by Blake Eggleston; reviewed by Marcus Eriksson for CASSANDRA-13328


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d6a701ea
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d6a701ea
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d6a701ea

Branch: refs/heads/trunk
Commit: d6a701ea11c919938cb09b0fca2ea0ec7ad2123b
Parents: 28b838e
Author: Blake Eggleston <bd...@gmail.com>
Authored: Tue Mar 14 15:37:57 2017 -0700
Committer: Blake Eggleston <bd...@gmail.com>
Committed: Thu Mar 16 15:41:54 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/StreamSession.java      |  21 ++-
 .../io/sstable/SSTableRewriterTest.java         |   4 +-
 .../cassandra/streaming/StreamSessionTest.java  | 132 +++++++++++++++++++
 4 files changed, 151 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6a701ea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a162aeb..4f856e2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Incremental repair not streaming correct sstables (CASSANDRA-13328)
  * Upgrade the jna version to 4.3.0 (CASSANDRA-13300)
  * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132)
  * Remove config option index_interval (CASSANDRA-10671)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6a701ea/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index b7db2b2..7ee99db 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -26,6 +26,8 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.*;
 
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
@@ -320,7 +322,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
             flushSSTables(stores);
 
         List<Range<Token>> normalizedRanges = Range.normalize(ranges);
-        List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt, isIncremental);
+        List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, repairedAt, pendingRepair);
         try
         {
             addTransferFiles(sections);
@@ -362,7 +364,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     }
 
     @VisibleForTesting
-    public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, final boolean isIncremental)
+    public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, UUID pendingRepair)
     {
         Refs<SSTableReader> refs = new Refs<>();
         try
@@ -375,6 +377,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                 refs.addAll(cfStore.selectAndReference(view -> {
                     Set<SSTableReader> sstables = Sets.newHashSet();
                     SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL));
+                    Predicate<SSTableReader> predicate;
+                    if (pendingRepair == ActiveRepairService.NO_PENDING_REPAIR)
+                    {
+                        predicate = Predicates.alwaysTrue();
+                    }
+                    else
+                    {
+                        predicate = s -> s.isPendingRepair() && s.getSSTableMetadata().pendingRepair.equals(pendingRepair);
+                    }
+
                     for (Range<PartitionPosition> keyRange : keyRanges)
                     {
                         // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end).
@@ -383,10 +395,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                         // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even
                         // including keyRange.left will still exclude any key having the token of the original token range, and so we're
                         // still actually selecting what we wanted.
-                        for (SSTableReader sstable : View.sstablesInBounds(keyRange.left, keyRange.right, intervalTree))
+                        for (SSTableReader sstable : Iterables.filter(View.sstablesInBounds(keyRange.left, keyRange.right, intervalTree), predicate))
                         {
-                            if (!isIncremental || !sstable.isRepaired())
-                                sstables.add(sstable);
+                            sstables.add(sstable);
                         }
                     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6a701ea/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 41a6828..b8595af 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -789,7 +789,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
 
         List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = StreamSession.getSSTableSectionsForRanges(
             Collections.singleton(new Range<Token>(firstToken, firstToken)),
-            Collections.singleton(cfs), 0L, false);
+            Collections.singleton(cfs), 0L, null);
         assertEquals(1, sectionsBeforeRewrite.size());
         for (StreamSession.SSTableStreamingSections section : sectionsBeforeRewrite)
             section.ref.release();
@@ -804,7 +804,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
                 while (!done.get())
                 {
                     Set<Range<Token>> range = Collections.singleton(new Range<Token>(firstToken, firstToken));
-                    List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), 0L, false);
+                    List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), 0L, null);
                     if (sections.size() != 1)
                         failed.set(true);
                     for (StreamSession.SSTableStreamingSections section : sections)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d6a701ea/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
new file mode 100644
index 0000000..8d388ab
--- /dev/null
+++ b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class StreamSessionTest
+{
+    private String keyspace = null;
+    private static final String table = "tbl";
+
+    private TableMetadata tbm;
+    private ColumnFamilyStore cfs;
+
+    @BeforeClass
+    public static void setupClass() throws Exception
+    {
+        SchemaLoader.prepareServer();
+    }
+
+    @Before
+    public void createKeyspace() throws Exception
+    {
+        keyspace = String.format("ks_%s", System.currentTimeMillis());
+        tbm = CreateTableStatement.parse(String.format("CREATE TABLE %s (k INT PRIMARY KEY, v INT)", table), keyspace).build();
+        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), tbm);
+        cfs = Schema.instance.getColumnFamilyStoreInstance(tbm.id);
+    }
+
+    private SSTableReader createSSTable(Runnable queryable)
+    {
+        Set<SSTableReader> before = cfs.getLiveSSTables();
+        queryable.run();
+        cfs.forceBlockingFlush();
+        Set<SSTableReader> after = cfs.getLiveSSTables();
+
+        Set<SSTableReader> diff = Sets.difference(after, before);
+        assert diff.size() == 1 : "Expected 1 new sstable, got " + diff.size();
+        return diff.iterator().next();
+    }
+
+    private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException
+    {
+        Descriptor descriptor = sstable.descriptor;
+        descriptor.getMetadataSerializer().mutateRepaired(descriptor, repairedAt, pendingRepair);
+        sstable.reloadSSTableMetadata();
+
+    }
+
+    private Set<SSTableReader> selectReaders(UUID pendingRepair)
+    {
+        IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+        Collection<Range<Token>> ranges = Lists.newArrayList(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
+        List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(ranges,
+                                                                                                          Lists.newArrayList(cfs),
+                                                                                                          ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                                                                          pendingRepair);
+        Set<SSTableReader> sstables = new HashSet<>();
+        for (StreamSession.SSTableStreamingSections section: sections)
+        {
+            sstables.add(section.ref.get());
+        }
+        return sstables;
+    }
+
+    @Test
+    public void incrementalSSTableSelection() throws Exception
+    {
+        // make 3 tables, 1 unrepaired, 2 pending repair with different repair ids, and 1 repaired
+        SSTableReader sstable1 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (1, 1)", keyspace, table)));
+        SSTableReader sstable2 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (2, 2)", keyspace, table)));
+        SSTableReader sstable3 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (3, 3)", keyspace, table)));
+        SSTableReader sstable4 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (4, 4)", keyspace, table)));
+
+
+        UUID pendingRepair = UUIDGen.getTimeUUID();
+        long repairedAt = System.currentTimeMillis();
+        mutateRepaired(sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair);
+        mutateRepaired(sstable3, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID());
+        mutateRepaired(sstable4, repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
+
+        // no pending repair should return all sstables
+        Assert.assertEquals(Sets.newHashSet(sstable1, sstable2, sstable3, sstable4), selectReaders(ActiveRepairService.NO_PENDING_REPAIR));
+
+        // a pending repair arg should only return sstables with the same pending repair id
+        Assert.assertEquals(Sets.newHashSet(sstable2), selectReaders(pendingRepair));
+    }
+}