You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/05/24 05:54:35 UTC

[04/15] cassandra git commit: Avoid holding SSTableReaders for duration of incremental repair

Avoid holding SSTableReaders for duration of incremental repair

Patch by Paulo Motta; reviewed by Marcus Eriksson for CASSANDRA-11739


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/675591d1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/675591d1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/675591d1

Branch: refs/heads/cassandra-3.0
Commit: 675591d1c55805c13db95692c78f5feb63538eb5
Parents: 129b119
Author: Paulo Motta <pa...@gmail.com>
Authored: Tue May 17 17:29:50 2016 -0300
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue May 24 07:25:44 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |   2 +-
 .../cassandra/service/ActiveRepairService.java  |  53 +++++----
 .../service/ActiveRepairServiceTest.java        | 107 +++++++++++++++++++
 4 files changed, 140 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/675591d1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7a0ccba..fcd7c3c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.15
+ * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739)
  * Add message dropped tasks to nodetool netstats (CASSANDRA-11855)
  * Don't compute expensive MaxPurgeableTimestamp until we've verified there's an 
    expired tombstone (CASSANDRA-11834)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/675591d1/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 96d873f..5af63fe 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -474,7 +474,7 @@ public class CompactionManager implements CompactionManagerMBean
     /**
      * Make sure the {validatedForRepair} are marked for compaction before calling this.
      *
-     * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getAndReferenceSSTables(..)).
+     * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)).
      *
      * @param cfs
      * @param ranges Ranges that the repair was carried out on

http://git-wip-us.apache.org/repos/asf/cassandra/blob/675591d1/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 732267e..5297ce3 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import java.io.File;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
@@ -42,7 +41,6 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
@@ -321,7 +319,7 @@ public class ActiveRepairService
         Set<SSTableReader> repairing = new HashSet<>();
         for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet())
         {
-            Collection<SSTableReader> sstables = entry.getValue().sstableMap.get(cfId);
+            Collection<SSTableReader> sstables = entry.getValue().getActiveSSTables(cfId);
             if (sstables != null && !entry.getKey().equals(parentRepairSession))
                 repairing.addAll(sstables);
         }
@@ -384,7 +382,7 @@ public class ActiveRepairService
         List<ListenableFuture<?>> futures = new ArrayList<>();
         for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
         {
-            Refs<SSTableReader> sstables = prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey());
+            Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefs(columnFamilyStoreEntry.getKey());
             ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
             futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
         }
@@ -428,7 +426,7 @@ public class ActiveRepairService
     {
         public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
         public final Collection<Range<Token>> ranges;
-        public final Map<UUID, Set<SSTableReader>> sstableMap = new HashMap<>();
+        public final Map<UUID, Set<String>> sstableMap = new HashMap<>();
         public final long repairedAt;
 
         public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt)
@@ -436,39 +434,50 @@ public class ActiveRepairService
             for (ColumnFamilyStore cfs : columnFamilyStores)
             {
                 this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
-                sstableMap.put(cfs.metadata.cfId, new HashSet<SSTableReader>());
+                sstableMap.put(cfs.metadata.cfId, new HashSet<String>());
             }
             this.ranges = ranges;
             this.repairedAt = repairedAt;
         }
 
-        public synchronized Refs<SSTableReader> getAndReferenceSSTables(UUID cfId)
+        @SuppressWarnings("resource")
+        public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefs(UUID cfId)
         {
-            Set<SSTableReader> sstables = sstableMap.get(cfId);
-            Iterator<SSTableReader> sstableIterator = sstables.iterator();
             ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder();
-            while (sstableIterator.hasNext())
+            for (SSTableReader sstable : getActiveSSTables(cfId))
             {
-                SSTableReader sstable = sstableIterator.next();
-                if (!new File(sstable.descriptor.filenameFor(Component.DATA)).exists())
-                {
-                    sstableIterator.remove();
-                }
+                Ref<SSTableReader> ref = sstable.tryRef();
+                if (ref == null)
+                    sstableMap.get(cfId).remove(sstable.getFilename());
                 else
+                    references.put(sstable, ref);
+            }
+            return new Refs<>(references.build());
+        }
+
+        private Set<SSTableReader> getActiveSSTables(UUID cfId)
+        {
+            Set<String> repairedSSTables = sstableMap.get(cfId);
+            Set<SSTableReader> activeSSTables = new HashSet<>();
+            Set<String> activeSSTableNames = new HashSet<>();
+            for (SSTableReader sstable : columnFamilyStores.get(cfId).getSSTables())
+            {
+                if (repairedSSTables.contains(sstable.getFilename()))
                 {
-                    Ref<SSTableReader> ref = sstable.tryRef();
-                    if (ref == null)
-                        sstableIterator.remove();
-                    else
-                        references.put(sstable, ref);
+                    activeSSTables.add(sstable);
+                    activeSSTableNames.add(sstable.getFilename());
                 }
             }
-            return new Refs<>(references.build());
+            sstableMap.put(cfId, activeSSTableNames);
+            return activeSSTables;
         }
 
         public void addSSTables(UUID cfId, Collection<SSTableReader> sstables)
         {
-            sstableMap.get(cfId).addAll(sstables);
+            for (SSTableReader sstable : sstables)
+            {
+                sstableMap.get(cfId).add(sstable.getFilename());
+            }
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/675591d1/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
new file mode 100644
index 0000000..419ea1a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.Sets;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class ActiveRepairServiceTest extends SchemaLoader
+{
+
+    private static final String KEYSPACE1 = "Keyspace1";
+    private static final String CF = "Standard1";
+
+    @Test
+    public void testGetActiveRepairedSSTableRefs()
+    {
+        ColumnFamilyStore store = prepareColumnFamilyStore();
+        Set<SSTableReader> original = store.getUnrepairedSSTables();
+
+        UUID prsId = UUID.randomUUID();
+        ActiveRepairService.instance.registerParentRepairSession(prsId, Collections.singletonList(store), null);
+        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId);
+
+        //add all sstables to parent repair session
+        prs.addSSTables(store.metadata.cfId, original);
+
+        //retrieve all sstable references from parent repair sessions
+        Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
+        Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+        assertEquals(original, retrieved);
+        refs.release();
+
+        //remove 1 sstable from data data tracker
+        Set<SSTableReader> newLiveSet = new HashSet<>(original);
+        Iterator<SSTableReader> it = newLiveSet.iterator();
+        SSTableReader removed = it.next();
+        it.remove();
+        store.getDataTracker().replaceWithNewInstances(Collections.singleton(removed), Collections.EMPTY_SET);
+
+        //retrieve sstable references from parent repair session again - removed sstable must not be present
+        refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
+        retrieved = Sets.newHashSet(refs.iterator());
+        assertEquals(newLiveSet, retrieved);
+        assertFalse(retrieved.contains(removed));
+        refs.release();
+    }
+
+    private ColumnFamilyStore prepareColumnFamilyStore()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+        store.truncateBlocking();
+        store.disableAutoCompaction();
+        long timestamp = System.currentTimeMillis();
+        //create 10 sstables
+        for (int i = 0; i < 10; i++)
+        {
+            DecoratedKey key = Util.dk(Integer.toString(i));
+            Mutation rm = new Mutation(KEYSPACE1, key.getKey());
+            for (int j = 0; j < 10; j++)
+                rm.add("Standard1", Util.cellname(Integer.toString(j)),
+                       ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                       timestamp,
+                       0);
+            rm.apply();
+            store.forceBlockingFlush();
+        }
+        return store;
+    }
+}