You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/05/24 05:54:35 UTC
[04/15] cassandra git commit: Avoid holding SSTableReaders for
duration of incremental repair
Avoid holding SSTableReaders for duration of incremental repair
Patch by Paulo Motta; reviewed by Marcus Eriksson for CASSANDRA-11739
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/675591d1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/675591d1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/675591d1
Branch: refs/heads/cassandra-3.0
Commit: 675591d1c55805c13db95692c78f5feb63538eb5
Parents: 129b119
Author: Paulo Motta <pa...@gmail.com>
Authored: Tue May 17 17:29:50 2016 -0300
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue May 24 07:25:44 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 2 +-
.../cassandra/service/ActiveRepairService.java | 53 +++++----
.../service/ActiveRepairServiceTest.java | 107 +++++++++++++++++++
4 files changed, 140 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/675591d1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7a0ccba..fcd7c3c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.15
+ * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739)
* Add message dropped tasks to nodetool netstats (CASSANDRA-11855)
* Don't compute expensive MaxPurgeableTimestamp until we've verified there's an
expired tombstone (CASSANDRA-11834)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/675591d1/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 96d873f..5af63fe 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -474,7 +474,7 @@ public class CompactionManager implements CompactionManagerMBean
/**
* Make sure the {validatedForRepair} are marked for compaction before calling this.
*
- * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getAndReferenceSSTables(..)).
+ * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)).
*
* @param cfs
* @param ranges Ranges that the repair was carried out on
http://git-wip-us.apache.org/repos/asf/cassandra/blob/675591d1/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 732267e..5297ce3 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.service;
-import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
@@ -42,7 +41,6 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
@@ -321,7 +319,7 @@ public class ActiveRepairService
Set<SSTableReader> repairing = new HashSet<>();
for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet())
{
- Collection<SSTableReader> sstables = entry.getValue().sstableMap.get(cfId);
+ Collection<SSTableReader> sstables = entry.getValue().getActiveSSTables(cfId);
if (sstables != null && !entry.getKey().equals(parentRepairSession))
repairing.addAll(sstables);
}
@@ -384,7 +382,7 @@ public class ActiveRepairService
List<ListenableFuture<?>> futures = new ArrayList<>();
for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
{
- Refs<SSTableReader> sstables = prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey());
+ Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefs(columnFamilyStoreEntry.getKey());
ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
}
@@ -428,7 +426,7 @@ public class ActiveRepairService
{
public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
public final Collection<Range<Token>> ranges;
- public final Map<UUID, Set<SSTableReader>> sstableMap = new HashMap<>();
+ public final Map<UUID, Set<String>> sstableMap = new HashMap<>();
public final long repairedAt;
public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt)
@@ -436,39 +434,50 @@ public class ActiveRepairService
for (ColumnFamilyStore cfs : columnFamilyStores)
{
this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
- sstableMap.put(cfs.metadata.cfId, new HashSet<SSTableReader>());
+ sstableMap.put(cfs.metadata.cfId, new HashSet<String>());
}
this.ranges = ranges;
this.repairedAt = repairedAt;
}
- public synchronized Refs<SSTableReader> getAndReferenceSSTables(UUID cfId)
+ @SuppressWarnings("resource")
+ public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefs(UUID cfId)
{
- Set<SSTableReader> sstables = sstableMap.get(cfId);
- Iterator<SSTableReader> sstableIterator = sstables.iterator();
ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder();
- while (sstableIterator.hasNext())
+ for (SSTableReader sstable : getActiveSSTables(cfId))
{
- SSTableReader sstable = sstableIterator.next();
- if (!new File(sstable.descriptor.filenameFor(Component.DATA)).exists())
- {
- sstableIterator.remove();
- }
+ Ref<SSTableReader> ref = sstable.tryRef();
+ if (ref == null)
+ sstableMap.get(cfId).remove(sstable.getFilename());
else
+ references.put(sstable, ref);
+ }
+ return new Refs<>(references.build());
+ }
+
+ private Set<SSTableReader> getActiveSSTables(UUID cfId)
+ {
+ Set<String> repairedSSTables = sstableMap.get(cfId);
+ Set<SSTableReader> activeSSTables = new HashSet<>();
+ Set<String> activeSSTableNames = new HashSet<>();
+ for (SSTableReader sstable : columnFamilyStores.get(cfId).getSSTables())
+ {
+ if (repairedSSTables.contains(sstable.getFilename()))
{
- Ref<SSTableReader> ref = sstable.tryRef();
- if (ref == null)
- sstableIterator.remove();
- else
- references.put(sstable, ref);
+ activeSSTables.add(sstable);
+ activeSSTableNames.add(sstable.getFilename());
}
}
- return new Refs<>(references.build());
+ sstableMap.put(cfId, activeSSTableNames);
+ return activeSSTables;
}
public void addSSTables(UUID cfId, Collection<SSTableReader> sstables)
{
- sstableMap.get(cfId).addAll(sstables);
+ for (SSTableReader sstable : sstables)
+ {
+ sstableMap.get(cfId).add(sstable.getFilename());
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/675591d1/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
new file mode 100644
index 0000000..419ea1a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.Sets;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class ActiveRepairServiceTest extends SchemaLoader
+{
+
+ private static final String KEYSPACE1 = "Keyspace1";
+ private static final String CF = "Standard1";
+
+ @Test
+ public void testGetActiveRepairedSSTableRefs()
+ {
+ ColumnFamilyStore store = prepareColumnFamilyStore();
+ Set<SSTableReader> original = store.getUnrepairedSSTables();
+
+ UUID prsId = UUID.randomUUID();
+ ActiveRepairService.instance.registerParentRepairSession(prsId, Collections.singletonList(store), null);
+ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId);
+
+ //add all sstables to parent repair session
+ prs.addSSTables(store.metadata.cfId, original);
+
+ //retrieve all sstable references from parent repair sessions
+ Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
+ Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+ assertEquals(original, retrieved);
+ refs.release();
+
+ //remove 1 sstable from data data tracker
+ Set<SSTableReader> newLiveSet = new HashSet<>(original);
+ Iterator<SSTableReader> it = newLiveSet.iterator();
+ SSTableReader removed = it.next();
+ it.remove();
+ store.getDataTracker().replaceWithNewInstances(Collections.singleton(removed), Collections.EMPTY_SET);
+
+ //retrieve sstable references from parent repair session again - removed sstable must not be present
+ refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
+ retrieved = Sets.newHashSet(refs.iterator());
+ assertEquals(newLiveSet, retrieved);
+ assertFalse(retrieved.contains(removed));
+ refs.release();
+ }
+
+ private ColumnFamilyStore prepareColumnFamilyStore()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+ store.truncateBlocking();
+ store.disableAutoCompaction();
+ long timestamp = System.currentTimeMillis();
+ //create 10 sstables
+ for (int i = 0; i < 10; i++)
+ {
+ DecoratedKey key = Util.dk(Integer.toString(i));
+ Mutation rm = new Mutation(KEYSPACE1, key.getKey());
+ for (int j = 0; j < 10; j++)
+ rm.add("Standard1", Util.cellname(Integer.toString(j)),
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ timestamp,
+ 0);
+ rm.apply();
+ store.forceBlockingFlush();
+ }
+ return store;
+ }
+}