You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:36:08 UTC
[16/18] cassandra git commit: Transient Replication and Cheap Quorums
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 9766454..afe628b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -112,6 +113,7 @@ public class CompactionStrategyManager implements INotificationConsumer
/**
* Variables guarded by read and write lock above
*/
+ private final PendingRepairHolder transientRepairs;
private final PendingRepairHolder pendingRepairs;
private final CompactionStrategyHolder repaired;
private final CompactionStrategyHolder unrepaired;
@@ -156,10 +158,11 @@ public class CompactionStrategyManager implements INotificationConsumer
return compactionStrategyIndexForDirectory(descriptor);
}
};
- pendingRepairs = new PendingRepairHolder(cfs, router);
+ transientRepairs = new PendingRepairHolder(cfs, router, true);
+ pendingRepairs = new PendingRepairHolder(cfs, router, false);
repaired = new CompactionStrategyHolder(cfs, router, true);
unrepaired = new CompactionStrategyHolder(cfs, router, false);
- holders = ImmutableList.of(pendingRepairs, repaired, unrepaired);
+ holders = ImmutableList.of(transientRepairs, pendingRepairs, repaired, unrepaired);
cfs.getTracker().subscribe(this);
logger.trace("{} subscribed to the data tracker.", this);
@@ -176,7 +179,6 @@ public class CompactionStrategyManager implements INotificationConsumer
* Return the next background task
*
* Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)
- *
*/
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
@@ -188,18 +190,16 @@ public class CompactionStrategyManager implements INotificationConsumer
return null;
int numPartitions = getNumTokenPartitions();
+
// first try to promote/demote sstables from completed repairs
- List<TaskSupplier> repairFinishedSuppliers = pendingRepairs.getRepairFinishedTaskSuppliers();
- if (!repairFinishedSuppliers.isEmpty())
- {
- Collections.sort(repairFinishedSuppliers);
- for (TaskSupplier supplier : repairFinishedSuppliers)
- {
- AbstractCompactionTask task = supplier.getTask();
- if (task != null)
- return task;
- }
- }
+ AbstractCompactionTask repairFinishedTask;
+ repairFinishedTask = pendingRepairs.getNextRepairFinishedTask();
+ if (repairFinishedTask != null)
+ return repairFinishedTask;
+
+ repairFinishedTask = transientRepairs.getNextRepairFinishedTask();
+ if (repairFinishedTask != null)
+ return repairFinishedTask;
// sort compaction task suppliers by remaining tasks descending
List<TaskSupplier> suppliers = new ArrayList<>(numPartitions * holders.size());
@@ -393,64 +393,28 @@ public class CompactionStrategyManager implements INotificationConsumer
}
}
-
-
@VisibleForTesting
- List<AbstractCompactionStrategy> getRepaired()
+ CompactionStrategyHolder getRepairedUnsafe()
{
- readLock.lock();
- try
- {
- return Lists.newArrayList(repaired.allStrategies());
- }
- finally
- {
- readLock.unlock();
- }
+ return repaired;
}
@VisibleForTesting
- List<AbstractCompactionStrategy> getUnrepaired()
+ CompactionStrategyHolder getUnrepairedUnsafe()
{
- readLock.lock();
- try
- {
- return Lists.newArrayList(unrepaired.allStrategies());
- }
- finally
- {
- readLock.unlock();
- }
+ return unrepaired;
}
@VisibleForTesting
- Iterable<AbstractCompactionStrategy> getForPendingRepair(UUID sessionID)
+ PendingRepairHolder getPendingRepairsUnsafe()
{
- readLock.lock();
- try
- {
- return pendingRepairs.getStrategiesFor(sessionID);
- }
- finally
- {
- readLock.unlock();
- }
+ return pendingRepairs;
}
@VisibleForTesting
- Set<UUID> pendingRepairs()
+ PendingRepairHolder getTransientRepairsUnsafe()
{
- readLock.lock();
- try
- {
- Set<UUID> ids = new HashSet<>();
- pendingRepairs.getManagers().forEach(p -> ids.addAll(p.getSessions()));
- return ids;
- }
- finally
- {
- readLock.unlock();
- }
+ return transientRepairs;
}
public boolean hasDataForPendingRepair(UUID sessionID)
@@ -458,8 +422,7 @@ public class CompactionStrategyManager implements INotificationConsumer
readLock.lock();
try
{
- return Iterables.any(pendingRepairs.getManagers(),
- prm -> prm.hasDataForSession(sessionID));
+ return pendingRepairs.hasDataForSession(sessionID) || transientRepairs.hasDataForSession(sessionID);
}
finally
{
@@ -682,18 +645,19 @@ public class CompactionStrategyManager implements INotificationConsumer
throw new IllegalStateException("No holder claimed " + sstable);
}
- private AbstractStrategyHolder getHolder(long repairedAt, UUID pendingRepair)
+ private AbstractStrategyHolder getHolder(long repairedAt, UUID pendingRepair, boolean isTransient)
{
return getHolder(repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE,
- pendingRepair != ActiveRepairService.NO_PENDING_REPAIR);
+ pendingRepair != ActiveRepairService.NO_PENDING_REPAIR,
+ isTransient);
}
@VisibleForTesting
- AbstractStrategyHolder getHolder(boolean isRepaired, boolean isPendingRepair)
+ AbstractStrategyHolder getHolder(boolean isRepaired, boolean isPendingRepair, boolean isTransient)
{
for (AbstractStrategyHolder holder : holders)
{
- if (holder.managesRepairedGroup(isRepaired, isPendingRepair))
+ if (holder.managesRepairedGroup(isRepaired, isPendingRepair, isTransient))
return holder;
}
@@ -1146,16 +1110,26 @@ public class CompactionStrategyManager implements INotificationConsumer
long keyCount,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
MetadataCollector collector,
SerializationHeader header,
Collection<Index> indexes,
LifecycleTransaction txn)
{
+ SSTable.validateRepairedMetadata(repairedAt, pendingRepair, isTransient);
maybeReloadDiskBoundaries();
readLock.lock();
try
{
- return getHolder(repairedAt, pendingRepair).createSSTableMultiWriter(descriptor, keyCount, repairedAt, pendingRepair, collector, header, indexes, txn);
+ return getHolder(repairedAt, pendingRepair, isTransient).createSSTableMultiWriter(descriptor,
+ keyCount,
+ repairedAt,
+ pendingRepair,
+ isTransient,
+ collector,
+ header,
+ indexes,
+ txn);
}
finally
{
@@ -1220,7 +1194,7 @@ public class CompactionStrategyManager implements INotificationConsumer
* Mutates sstable repairedAt times and notifies listeners of the change with the writeLock held. Prevents races
* with other processes between when the metadata is changed and when sstables are moved between strategies.
*/
- public void mutateRepaired(Collection<SSTableReader> sstables, long repairedAt, UUID pendingRepair) throws IOException
+ public void mutateRepaired(Collection<SSTableReader> sstables, long repairedAt, UUID pendingRepair, boolean isTransient) throws IOException
{
Set<SSTableReader> changed = new HashSet<>();
@@ -1229,7 +1203,7 @@ public class CompactionStrategyManager implements INotificationConsumer
{
for (SSTableReader sstable: sstables)
{
- sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, repairedAt, pendingRepair);
+ sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingRepair, isTransient);
sstable.reloadSSTableMetadata();
changed.add(sstable);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 662384c..591b7c4 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -29,20 +29,19 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
-import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.FBUtilities;
@@ -339,6 +338,23 @@ public class CompactionTask extends AbstractCompactionTask
return ids.iterator().next();
}
+ public static boolean getIsTransient(Set<SSTableReader> sstables)
+ {
+ if (sstables.isEmpty())
+ {
+ return false;
+ }
+
+ boolean isTransient = sstables.iterator().next().isTransient();
+
+ if (!Iterables.all(sstables, sstable -> sstable.isTransient() == isTransient))
+ {
+ throw new RuntimeException("Attempting to compact transient sstables with non transient sstables");
+ }
+
+ return isTransient;
+ }
+
/*
* Checks if we have enough disk space to execute the compaction. Drops the largest sstable out of the Task until
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
index 7b9123f..92e44a7 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairHolder.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -43,10 +44,12 @@ import org.apache.cassandra.service.ActiveRepairService;
public class PendingRepairHolder extends AbstractStrategyHolder
{
private final List<PendingRepairManager> managers = new ArrayList<>();
+ private final boolean isTransient;
- public PendingRepairHolder(ColumnFamilyStore cfs, DestinationRouter router)
+ public PendingRepairHolder(ColumnFamilyStore cfs, DestinationRouter router, boolean isTransient)
{
super(cfs, router);
+ this.isTransient = isTransient;
}
@Override
@@ -66,15 +69,15 @@ public class PendingRepairHolder extends AbstractStrategyHolder
{
managers.clear();
for (int i = 0; i < numTokenPartitions; i++)
- managers.add(new PendingRepairManager(cfs, params));
+ managers.add(new PendingRepairManager(cfs, params, isTransient));
}
@Override
- public boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair)
+ public boolean managesRepairedGroup(boolean isRepaired, boolean isPendingRepair, boolean isTransient)
{
Preconditions.checkArgument(!isPendingRepair || !isRepaired,
"SSTables cannot be both repaired and pending repair");
- return isPendingRepair;
+ return isPendingRepair && (this.isTransient == isTransient);
}
@Override
@@ -145,7 +148,23 @@ public class PendingRepairHolder extends AbstractStrategyHolder
return tasks;
}
- public ArrayList<TaskSupplier> getRepairFinishedTaskSuppliers()
+ AbstractCompactionTask getNextRepairFinishedTask()
+ {
+ List<TaskSupplier> repairFinishedSuppliers = getRepairFinishedTaskSuppliers();
+ if (!repairFinishedSuppliers.isEmpty())
+ {
+ Collections.sort(repairFinishedSuppliers);
+ for (TaskSupplier supplier : repairFinishedSuppliers)
+ {
+ AbstractCompactionTask task = supplier.getTask();
+ if (task != null)
+ return task;
+ }
+ }
+ return null;
+ }
+
+ private ArrayList<TaskSupplier> getRepairFinishedTaskSuppliers()
{
ArrayList<TaskSupplier> suppliers = new ArrayList<>(managers.size());
for (PendingRepairManager manager : managers)
@@ -218,6 +237,7 @@ public class PendingRepairHolder extends AbstractStrategyHolder
long keyCount,
long repairedAt,
UUID pendingRepair,
+ boolean isTransient,
MetadataCollector collector,
SerializationHeader header,
Collection<Index> indexes,
@@ -233,6 +253,7 @@ public class PendingRepairHolder extends AbstractStrategyHolder
keyCount,
repairedAt,
pendingRepair,
+ isTransient,
collector,
header,
indexes,
@@ -249,4 +270,15 @@ public class PendingRepairHolder extends AbstractStrategyHolder
}
return -1;
}
+
+ public boolean hasDataForSession(UUID sessionID)
+ {
+ return Iterables.any(managers, prm -> prm.hasDataForSession(sessionID));
+ }
+
+ @Override
+ public boolean containsSSTable(SSTableReader sstable)
+ {
+ return Iterables.any(managers, prm -> prm.containsSSTable(sstable));
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index edc9a2f..6763abf 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -30,7 +30,9 @@ import java.util.UUID;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
@@ -62,6 +64,7 @@ class PendingRepairManager
private final ColumnFamilyStore cfs;
private final CompactionParams params;
+ private final boolean isTransient;
private volatile ImmutableMap<UUID, AbstractCompactionStrategy> strategies = ImmutableMap.of();
/**
@@ -75,10 +78,11 @@ class PendingRepairManager
}
}
- PendingRepairManager(ColumnFamilyStore cfs, CompactionParams params)
+ PendingRepairManager(ColumnFamilyStore cfs, CompactionParams params, boolean isTransient)
{
this.cfs = cfs;
this.params = params;
+ this.isTransient = isTransient;
}
private ImmutableMap.Builder<UUID, AbstractCompactionStrategy> mapBuilder()
@@ -156,6 +160,7 @@ class PendingRepairManager
synchronized void addSSTable(SSTableReader sstable)
{
+ Preconditions.checkArgument(sstable.isTransient() == isTransient);
getOrCreate(sstable).addSSTable(sstable);
}
@@ -389,6 +394,15 @@ class PendingRepairManager
return strategies.keySet().contains(sessionID);
}
+ boolean containsSSTable(SSTableReader sstable)
+ {
+ if (!sstable.isPendingRepair())
+ return false;
+
+ AbstractCompactionStrategy strategy = strategies.get(sstable.getPendingRepair());
+ return strategy != null && strategy.getSSTables().contains(sstable);
+ }
+
public Collection<AbstractCompactionTask> createUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore)
{
Map<UUID, List<SSTableReader>> group = sstables.stream().collect(Collectors.groupingBy(s -> s.getSSTableMetadata().pendingRepair));
@@ -419,18 +433,35 @@ class PendingRepairManager
protected void runMayThrow() throws Exception
{
boolean completed = false;
+ boolean obsoleteSSTables = isTransient && repairedAt > 0;
try
{
- logger.debug("Setting repairedAt to {} on {} for {}", repairedAt, transaction.originals(), sessionID);
- cfs.getCompactionStrategyManager().mutateRepaired(transaction.originals(), repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
+ if (obsoleteSSTables)
+ {
+ logger.info("Obsoleting transient repaired ssatbles");
+ Preconditions.checkState(Iterables.all(transaction.originals(), SSTableReader::isTransient));
+ transaction.obsoleteOriginals();
+ }
+ else
+ {
+ logger.debug("Setting repairedAt to {} on {} for {}", repairedAt, transaction.originals(), sessionID);
+ cfs.getCompactionStrategyManager().mutateRepaired(transaction.originals(), repairedAt, ActiveRepairService.NO_PENDING_REPAIR, false);
+ }
completed = true;
}
finally
{
- // we always abort because mutating metadata isn't guarded by LifecycleTransaction, so this won't roll
- // anything back. Also, we don't want to obsolete the originals. We're only using it to prevent other
- // compactions from marking these sstables compacting, and unmarking them when we're done
- transaction.abort();
+ if (obsoleteSSTables)
+ {
+ transaction.finish();
+ }
+ else
+ {
+ // we abort here because mutating metadata isn't guarded by LifecycleTransaction, so this won't roll
+ // anything back. Also, we don't want to obsolete the originals. We're only using it to prevent other
+ // compactions from marking these sstables compacting, and unmarking them when we're done
+ transaction.abort();
+ }
if (completed)
{
removeSession(sessionID);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index f97b693..aa41051 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -170,7 +170,7 @@ public class Scrubber implements Closeable
}
StatsMetadata metadata = sstable.getSSTableMetadata();
- writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, sstable, transaction));
+ writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, metadata.repairedAt, metadata.pendingRepair, metadata.isTransient, sstable, transaction));
DecoratedKey prevKey = null;
@@ -277,7 +277,7 @@ public class Scrubber implements Closeable
// out of order rows, but no bad rows found - we can keep our repairedAt time
long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : metadata.repairedAt;
SSTableReader newInOrderSstable;
- try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, metadata.pendingRepair, sstable, transaction))
+ try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, metadata.pendingRepair, metadata.isTransient, sstable, transaction))
{
for (Partition partition : outOfOrder)
inOrderWriter.append(partition.unfilteredIterator());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 80453ef..e1406aa 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -68,14 +68,15 @@ public class Upgrader
this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
}
- private SSTableWriter createCompactionWriter(long repairedAt, UUID parentRepair)
+ private SSTableWriter createCompactionWriter(StatsMetadata metadata)
{
MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator());
sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
return SSTableWriter.create(cfs.newSSTableDescriptor(directory),
estimatedRows,
- repairedAt,
- parentRepair,
+ metadata.repairedAt,
+ metadata.pendingRepair,
+ metadata.isTransient,
cfs.metadata,
sstableMetadataCollector,
SerializationHeader.make(cfs.metadata(), Sets.newHashSet(sstable)),
@@ -91,8 +92,7 @@ public class Upgrader
AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals());
CompactionIterator iter = new CompactionIterator(transaction.opType(), scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
{
- StatsMetadata metadata = sstable.getSSTableMetadata();
- writer.switchWriter(createCompactionWriter(metadata.repairedAt, metadata.pendingRepair));
+ writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata()));
while (iter.hasNext())
writer.append(iter.next());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index db49369..446d527 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -350,6 +350,7 @@ public class Verifier implements Closeable
public RangeOwnHelper(List<Range<Token>> normalizedRanges)
{
this.normalizedRanges = normalizedRanges;
+ Range.assertNormalized(normalizedRanges);
}
/**
@@ -457,7 +458,7 @@ public class Verifier implements Closeable
{
try
{
- sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().pendingRepair);
+ sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getPendingRepair(), sstable.isTransient());
sstable.reloadSSTableMetadata();
cfs.getTracker().notifySSTableRepairedStatusChanged(Collections.singleton(sstable));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 5ddd99c..d72b236 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -57,6 +57,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
protected final long maxAge;
protected final long minRepairedAt;
protected final UUID pendingRepair;
+ protected final boolean isTransient;
protected final SSTableRewriter sstableWriter;
protected final LifecycleTransaction txn;
@@ -91,6 +92,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
sstableWriter = SSTableRewriter.construct(cfs, txn, keepOriginals, maxAge);
minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
pendingRepair = CompactionTask.getPendingRepair(nonExpiredSSTables);
+ isTransient = CompactionTask.getIsTransient(nonExpiredSSTables);
DiskBoundaries db = cfs.getDiskBoundaries();
diskBoundaries = db.positions;
locations = db.directories;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index cda7e38..6180f96 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -72,6 +72,7 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
estimatedTotalKeys,
minRepairedAt,
pendingRepair,
+ isTransient,
cfs.metadata,
new MetadataCollector(txn.originals(), cfs.metadata().comparator, sstableLevel),
SerializationHeader.make(cfs.metadata(), nonExpiredSSTables),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 3959b4b..2b93eb4 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -108,6 +108,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
keysPerSSTable,
minRepairedAt,
pendingRepair,
+ isTransient,
cfs.metadata,
new MetadataCollector(txn.originals(), cfs.metadata().comparator, currentLevel),
SerializationHeader.make(cfs.metadata(), txn.originals()),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index c4f84e8..df7eeaf 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -111,6 +111,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
estimatedTotalKeys / estimatedSSTables,
minRepairedAt,
pendingRepair,
+ isTransient,
cfs.metadata,
new MetadataCollector(allSSTables, cfs.metadata().comparator, level),
SerializationHeader.make(cfs.metadata(), nonExpiredSSTables),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index a4af783..7533f1d 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -107,6 +107,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
currentPartitionsToWrite,
minRepairedAt,
pendingRepair,
+ isTransient,
cfs.metadata,
new MetadataCollector(allSSTables, cfs.metadata().comparator, 0),
SerializationHeader.make(cfs.metadata(), nonExpiredSSTables),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
index 9064b0f..bed0958 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
@@ -82,18 +82,6 @@ public abstract class PartitionIterators
return new SingletonPartitionIterator(iterator);
}
- public static void consume(PartitionIterator iterator)
- {
- while (iterator.hasNext())
- {
- try (RowIterator partition = iterator.next())
- {
- while (partition.hasNext())
- partition.next();
- }
- }
- }
-
/**
* Wraps the provided iterator so it logs the returned rows for debugging purposes.
* <p>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java b/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java
index 5f2e5a0..fa2e653 100644
--- a/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java
+++ b/src/java/org/apache/cassandra/db/repair/CassandraKeyspaceRepairManager.java
@@ -26,8 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.repair.KeyspaceRepairManager;
public class CassandraKeyspaceRepairManager implements KeyspaceRepairManager
@@ -40,9 +39,12 @@ public class CassandraKeyspaceRepairManager implements KeyspaceRepairManager
}
@Override
- public ListenableFuture prepareIncrementalRepair(UUID sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor)
+ public ListenableFuture prepareIncrementalRepair(UUID sessionID,
+ Collection<ColumnFamilyStore> tables,
+ RangesAtEndpoint tokenRanges,
+ ExecutorService executor)
{
- PendingAntiCompaction pac = new PendingAntiCompaction(sessionID, tables, ranges, executor);
+ PendingAntiCompaction pac = new PendingAntiCompaction(sessionID, tables, tokenRanges, executor);
return pac.run();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index 4e0f13d..a205c3c 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.utils.concurrent.Refs;
/**
@@ -126,17 +127,17 @@ public class PendingAntiCompaction
static class AcquisitionCallback implements AsyncFunction<List<AcquireResult>, Object>
{
private final UUID parentRepairSession;
- private final Collection<Range<Token>> ranges;
+ private final RangesAtEndpoint tokenRanges;
- public AcquisitionCallback(UUID parentRepairSession, Collection<Range<Token>> ranges)
+ public AcquisitionCallback(UUID parentRepairSession, RangesAtEndpoint tokenRanges)
{
this.parentRepairSession = parentRepairSession;
- this.ranges = ranges;
+ this.tokenRanges = tokenRanges;
}
ListenableFuture<?> submitPendingAntiCompaction(AcquireResult result)
{
- return CompactionManager.instance.submitPendingAntiCompaction(result.cfs, ranges, result.refs, result.txn, parentRepairSession);
+ return CompactionManager.instance.submitPendingAntiCompaction(result.cfs, tokenRanges, result.refs, result.txn, parentRepairSession);
}
public ListenableFuture apply(List<AcquireResult> results) throws Exception
@@ -177,14 +178,17 @@ public class PendingAntiCompaction
private final UUID prsId;
private final Collection<ColumnFamilyStore> tables;
- private final Collection<Range<Token>> ranges;
+ private final RangesAtEndpoint tokenRanges;
private final ExecutorService executor;
- public PendingAntiCompaction(UUID prsId, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> ranges, ExecutorService executor)
+ public PendingAntiCompaction(UUID prsId,
+ Collection<ColumnFamilyStore> tables,
+ RangesAtEndpoint tokenRanges,
+ ExecutorService executor)
{
this.prsId = prsId;
this.tables = tables;
- this.ranges = ranges;
+ this.tokenRanges = tokenRanges;
this.executor = executor;
}
@@ -194,12 +198,12 @@ public class PendingAntiCompaction
for (ColumnFamilyStore cfs : tables)
{
cfs.forceBlockingFlush();
- ListenableFutureTask<AcquireResult> task = ListenableFutureTask.create(new AcquisitionCallable(cfs, ranges, prsId));
+ ListenableFutureTask<AcquireResult> task = ListenableFutureTask.create(new AcquisitionCallable(cfs, tokenRanges.ranges(), prsId));
executor.submit(task);
tasks.add(task);
}
ListenableFuture<List<AcquireResult>> acquisitionResults = Futures.successfulAsList(tasks);
- ListenableFuture compactionResult = Futures.transformAsync(acquisitionResults, new AcquisitionCallback(prsId, ranges), MoreExecutors.directExecutor());
+ ListenableFuture compactionResult = Futures.transformAsync(acquisitionResults, new AcquisitionCallback(prsId, tokenRanges), MoreExecutors.directExecutor());
return compactionResult;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index 5252187..c688fdf 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -68,17 +68,18 @@ public class CassandraOutgoingFile implements OutgoingStream
private final ComponentManifest manifest;
private Boolean isFullyContained;
- private final List<Range<Token>> ranges;
+ private final List<Range<Token>> normalizedRanges;
public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref,
- List<SSTableReader.PartitionPositionBounds> sections, Collection<Range<Token>> ranges,
+ List<SSTableReader.PartitionPositionBounds> sections, List<Range<Token>> normalizedRanges,
long estimatedKeys)
{
Preconditions.checkNotNull(ref.get());
+ Range.assertNormalized(normalizedRanges);
this.ref = ref;
this.estimatedKeys = estimatedKeys;
this.sections = sections;
- this.ranges = ImmutableList.copyOf(ranges);
+ this.normalizedRanges = ImmutableList.copyOf(normalizedRanges);
this.filename = ref.get().getFilename();
this.manifest = getComponentManifest(ref.get());
@@ -194,7 +195,7 @@ public class CassandraOutgoingFile implements OutgoingStream
.getCompactionStrategyFor(ref.get());
if (compactionStrategy instanceof LeveledCompactionStrategy)
- return contained(ranges, ref.get());
+ return contained(normalizedRanges, ref.get());
return false;
}
@@ -251,6 +252,6 @@ public class CassandraOutgoingFile implements OutgoingStream
@Override
public String toString()
{
- return "CassandraOutgoingFile{" + ref.get().getFilename() + '}';
+ return "CassandraOutgoingFile{" + filename + '}';
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
index 43667d0..6c2631c 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
@@ -18,19 +18,10 @@
package org.apache.cassandra.db.streaming;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
@@ -39,6 +30,8 @@ import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.IncomingStream;
import org.apache.cassandra.streaming.OutgoingStream;
@@ -49,6 +42,14 @@ import org.apache.cassandra.streaming.TableStreamManager;
import org.apache.cassandra.streaming.messages.StreamMessageHeader;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.Refs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
/**
* Implements the streaming interface for the native cassandra storage engine.
@@ -96,14 +97,14 @@ public class CassandraStreamManager implements TableStreamManager
}
@Override
- public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, Collection<Range<Token>> ranges, UUID pendingRepair, PreviewKind previewKind)
+ public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, RangesAtEndpoint replicas, UUID pendingRepair, PreviewKind previewKind)
{
Refs<SSTableReader> refs = new Refs<>();
try
{
- final List<Range<PartitionPosition>> keyRanges = new ArrayList<>(ranges.size());
- for (Range<Token> range : ranges)
- keyRanges.add(Range.makeRowRange(range));
+ final List<Range<PartitionPosition>> keyRanges = new ArrayList<>(replicas.size());
+ for (Replica replica : replicas)
+ keyRanges.add(Range.makeRowRange(replica.range()));
refs.addAll(cfs.selectAndReference(view -> {
Set<SSTableReader> sstables = Sets.newHashSet();
SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL));
@@ -141,11 +142,16 @@ public class CassandraStreamManager implements TableStreamManager
}).refs);
+ List<Range<Token>> normalizedFullRanges = Range.normalize(replicas.filter(Replica::isFull).ranges());
+ List<Range<Token>> normalizedAllRanges = Range.normalize(replicas.ranges());
+ //Create outgoing file streams for ranges possibly skipping repaired ranges in sstables
List<OutgoingStream> streams = new ArrayList<>(refs.size());
- for (SSTableReader sstable: refs)
+ for (SSTableReader sstable : refs)
{
- Ref<SSTableReader> ref = refs.get(sstable);
+ List<Range<Token>> ranges = sstable.isRepaired() ? normalizedFullRanges : normalizedAllRanges;
List<SSTableReader.PartitionPositionBounds> sections = sstable.getPositionsForRanges(ranges);
+
+ Ref<SSTableReader> ref = refs.get(sstable);
if (sections.isEmpty())
{
ref.release();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index fccabfe..572c648 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -156,7 +156,7 @@ public class CassandraStreamReader implements IStreamReader
Preconditions.checkState(streamReceiver instanceof CassandraStreamReceiver);
LifecycleTransaction txn = CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).getTransaction();
- RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, format, sstableLevel, totalSize, txn, getHeader(cfs.metadata()));
+ RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, pendingRepair, false, format, sstableLevel, totalSize, txn, getHeader(cfs.metadata()));
return writer;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java b/src/java/org/apache/cassandra/db/view/TableViews.java
index d35457e..09490e8 100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@ -60,6 +60,11 @@ public class TableViews extends AbstractCollection<View>
baseTableMetadata = Schema.instance.getTableMetadataRef(id);
}
+ public boolean hasViews()
+ {
+ return !views.isEmpty();
+ }
+
public int size()
{
return views.size();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index c727f63..6717297 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -43,6 +43,8 @@ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.repair.SystemDistributedKeyspace;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -135,14 +137,15 @@ class ViewBuilder
}
// Get the local ranges for which the view hasn't already been built nor it's building
- Set<Range<Token>> newRanges = StorageService.instance.getLocalRanges(ksName)
- .stream()
- .map(r -> r.subtractAll(builtRanges))
- .flatMap(Set::stream)
- .map(r -> r.subtractAll(pendingRanges.keySet()))
- .flatMap(Set::stream)
- .collect(Collectors.toSet());
-
+ RangesAtEndpoint replicatedRanges = StorageService.instance.getLocalReplicas(ksName);
+ Replicas.temporaryAssertFull(replicatedRanges);
+ Set<Range<Token>> newRanges = replicatedRanges.ranges()
+ .stream()
+ .map(r -> r.subtractAll(builtRanges))
+ .flatMap(Set::stream)
+ .map(r -> r.subtractAll(pendingRanges.keySet()))
+ .flatMap(Set::stream)
+ .collect(Collectors.toSet());
// If there are no new nor pending ranges we should finish the build
if (newRanges.isEmpty() && pendingRanges.isEmpty())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java
index 000477d..7e3ea1b 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -79,7 +79,7 @@ public class ViewManager
{
assert keyspace.getName().equals(update.metadata().keyspace);
- if (coordinatorBatchlog && keyspace.getReplicationStrategy().getReplicationFactor() == 1)
+ if (coordinatorBatchlog && keyspace.getReplicationStrategy().getReplicationFactor().allReplicas == 1)
continue;
if (!forTable(update.metadata().id).updatedViews(update).isEmpty())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/db/view/ViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java
index df16943..ad10d9d 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUtils.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java
@@ -18,16 +18,17 @@
package org.apache.cassandra.db.view;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Optional;
+import java.util.function.Predicate;
+import com.google.common.collect.Iterables;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.utils.FBUtilities;
public final class ViewUtils
@@ -58,46 +59,51 @@ public final class ViewUtils
*
* @return Optional.empty() if this method is called using a base token which does not belong to this replica
*/
- public static Optional<InetAddressAndPort> getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken)
+ public static Optional<Replica> getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken)
{
AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy();
String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
- List<InetAddressAndPort> baseEndpoints = new ArrayList<>();
- List<InetAddressAndPort> viewEndpoints = new ArrayList<>();
- for (InetAddressAndPort baseEndpoint : replicationStrategy.getNaturalEndpoints(baseToken))
- {
- // An endpoint is local if we're not using Net
- if (!(replicationStrategy instanceof NetworkTopologyStrategy) ||
- DatabaseDescriptor.getEndpointSnitch().getDatacenter(baseEndpoint).equals(localDataCenter))
- baseEndpoints.add(baseEndpoint);
- }
+ EndpointsForToken naturalBaseReplicas = replicationStrategy.getNaturalReplicasForToken(baseToken);
+ EndpointsForToken naturalViewReplicas = replicationStrategy.getNaturalReplicasForToken(viewToken);
- for (InetAddressAndPort viewEndpoint : replicationStrategy.getNaturalEndpoints(viewToken))
- {
- // If we are a base endpoint which is also a view replica, we use ourselves as our view replica
- if (viewEndpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
- return Optional.of(viewEndpoint);
+ Optional<Replica> localReplica = Iterables.tryFind(naturalViewReplicas, Replica::isLocal).toJavaUtil();
+ if (localReplica.isPresent())
+ return localReplica;
- // We have to remove any endpoint which is shared between the base and the view, as it will select itself
- // and throw off the counts otherwise.
- if (baseEndpoints.contains(viewEndpoint))
- baseEndpoints.remove(viewEndpoint);
- else if (!(replicationStrategy instanceof NetworkTopologyStrategy) ||
- DatabaseDescriptor.getEndpointSnitch().getDatacenter(viewEndpoint).equals(localDataCenter))
- viewEndpoints.add(viewEndpoint);
- }
+ // We only select replicas from our own DC
+ // TODO: this is poor encapsulation, leaking implementation details of replication strategy
+ Predicate<Replica> isLocalDC = r -> !(replicationStrategy instanceof NetworkTopologyStrategy)
+ || DatabaseDescriptor.getEndpointSnitch().getDatacenter(r).equals(localDataCenter);
+
+ // We have to remove any endpoint which is shared between the base and the view, as it will select itself
+ // and throw off the counts otherwise.
+ EndpointsForToken baseReplicas = naturalBaseReplicas.filter(
+ r -> !naturalViewReplicas.endpoints().contains(r.endpoint()) && isLocalDC.test(r)
+ );
+ EndpointsForToken viewReplicas = naturalViewReplicas.filter(
+ r -> !naturalBaseReplicas.endpoints().contains(r.endpoint()) && isLocalDC.test(r)
+ );
// The replication strategy will be the same for the base and the view, as they must belong to the same keyspace.
// Since the same replication strategy is used, the same placement should be used and we should get the same
// number of replicas for all of the tokens in the ring.
- assert baseEndpoints.size() == viewEndpoints.size() : "Replication strategy should have the same number of endpoints for the base and the view";
- int baseIdx = baseEndpoints.indexOf(FBUtilities.getBroadcastAddressAndPort());
+ assert baseReplicas.size() == viewReplicas.size() : "Replication strategy should have the same number of endpoints for the base and the view";
+
+ int baseIdx = -1;
+ for (int i=0; i<baseReplicas.size(); i++)
+ {
+ if (baseReplicas.get(i).isLocal())
+ {
+ baseIdx = i;
+ break;
+ }
+ }
if (baseIdx < 0)
//This node is not a base replica of this key, so we return empty
return Optional.empty();
- return Optional.of(viewEndpoints.get(baseIdx));
+ return Optional.of(viewReplicas.get(baseIdx));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java
index 974d08e..e03c5ec 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.dht;
import java.io.Serializable;
import java.util.*;
+import java.util.function.Predicate;
import org.apache.commons.lang3.ObjectUtils;
@@ -529,7 +530,7 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
/**
* Helper class to check if a token is contained within a given collection of ranges
*/
- public static class OrderedRangeContainmentChecker
+ public static class OrderedRangeContainmentChecker implements Predicate<Token>
{
private final Iterator<Range<Token>> normalizedRangesIterator;
private Token lastToken = null;
@@ -550,7 +551,8 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
* @param t token to check, must be larger than or equal to the last token passed
* @return true if the token is contained within the ranges given to the constructor.
*/
- public boolean contains(Token t)
+ @Override
+ public boolean test(Token t)
{
assert lastToken == null || lastToken.compareTo(t) <= 0;
lastToken = t;
@@ -567,4 +569,25 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
}
}
}
+
+ public static <T extends RingPosition<T>> void assertNormalized(List<Range<T>> ranges)
+ {
+ Range<T> lastRange = null;
+ for (Range<T> range : ranges)
+ {
+ if (lastRange == null)
+ {
+ lastRange = range;
+ }
+ else if (lastRange.left.compareTo(range.left) >= 0 || lastRange.intersects(range))
+ {
+ throw new AssertionError(String.format("Ranges aren't properly normalized. lastRange %s, range %s, compareTo %d, intersects %b, all ranges %s%n",
+ lastRange,
+ range,
+ lastRange.compareTo(range),
+ lastRange.intersects(range),
+ ranges));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
index b90bc96..4b98b97 100644
--- a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
+++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
@@ -19,25 +19,27 @@
package org.apache.cassandra.dht;
import java.math.BigInteger;
-import java.net.InetAddress;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
-import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import org.apache.cassandra.locator.EndpointsByRange;
+import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.locator.Replica;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.Replicas;
import org.psjava.algo.graph.flownetwork.FordFulkersonAlgorithm;
import org.psjava.algo.graph.flownetwork.MaximumFlowAlgorithm;
import org.psjava.algo.graph.flownetwork.MaximumFlowAlgorithmResult;
@@ -73,20 +75,20 @@ public class RangeFetchMapCalculator
{
private static final Logger logger = LoggerFactory.getLogger(RangeFetchMapCalculator.class);
private static final long TRIVIAL_RANGE_LIMIT = 1000;
- private final Multimap<Range<Token>, InetAddressAndPort> rangesWithSources;
- private final Collection<RangeStreamer.ISourceFilter> sourceFilters;
+ private final EndpointsByRange rangesWithSources;
+ private final Predicate<Replica> sourceFilters;
private final String keyspace;
//We need two Vertices to act as source and destination in the algorithm
private final Vertex sourceVertex = OuterVertex.getSourceVertex();
private final Vertex destinationVertex = OuterVertex.getDestinationVertex();
private final Set<Range<Token>> trivialRanges;
- public RangeFetchMapCalculator(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources,
- Collection<RangeStreamer.ISourceFilter> sourceFilters,
+ public RangeFetchMapCalculator(EndpointsByRange rangesWithSources,
+ Collection<Predicate<Replica>> sourceFilters,
String keyspace)
{
this.rangesWithSources = rangesWithSources;
- this.sourceFilters = sourceFilters;
+ this.sourceFilters = Predicates.and(sourceFilters);
this.keyspace = keyspace;
this.trivialRanges = rangesWithSources.keySet()
.stream()
@@ -158,14 +160,15 @@ public class RangeFetchMapCalculator
boolean localDCCheck = true;
while (!added)
{
- List<InetAddressAndPort> srcs = new ArrayList<>(rangesWithSources.get(trivialRange));
// sort with the endpoint having the least number of streams first:
- srcs.sort(Comparator.comparingInt(o -> optimisedMap.get(o).size()));
- for (InetAddressAndPort src : srcs)
+ EndpointsForRange replicas = rangesWithSources.get(trivialRange)
+ .sorted(Comparator.comparingInt(o -> optimisedMap.get(o.endpoint()).size()));
+ Replicas.temporaryAssertFull(replicas);
+ for (Replica replica : replicas)
{
- if (passFilters(src, localDCCheck))
+ if (passFilters(replica, localDCCheck))
{
- fetchMap.put(src, trivialRange);
+ fetchMap.put(replica.endpoint(), trivialRange);
added = true;
break;
}
@@ -347,15 +350,16 @@ public class RangeFetchMapCalculator
private boolean addEndpoints(MutableCapacityGraph<Vertex, Integer> capacityGraph, RangeVertex rangeVertex, boolean localDCCheck)
{
boolean sourceFound = false;
- for (InetAddressAndPort endpoint : rangesWithSources.get(rangeVertex.getRange()))
+ Replicas.temporaryAssertFull(rangesWithSources.get(rangeVertex.getRange()));
+ for (Replica replica : rangesWithSources.get(rangeVertex.getRange()))
{
- if (passFilters(endpoint, localDCCheck))
+ if (passFilters(replica, localDCCheck))
{
sourceFound = true;
// if we pass filters, it means that we don't filter away localhost and we can count it as a source:
- if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
+ if (replica.isLocal())
continue; // but don't add localhost to the graph to avoid streaming locally
- final Vertex endpointVertex = new EndpointVertex(endpoint);
+ final Vertex endpointVertex = new EndpointVertex(replica.endpoint());
capacityGraph.insertVertex(rangeVertex);
capacityGraph.insertVertex(endpointVertex);
capacityGraph.addEdge(rangeVertex, endpointVertex, Integer.MAX_VALUE);
@@ -364,26 +368,20 @@ public class RangeFetchMapCalculator
return sourceFound;
}
- private boolean isInLocalDC(InetAddressAndPort endpoint)
+ private boolean isInLocalDC(Replica replica)
{
- return DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint));
+ return DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica));
}
/**
*
- * @param endpoint Endpoint to check
+ * @param replica Replica to check
* @param localDCCheck Allow endpoints with local DC
* @return True if filters pass this endpoint
*/
- private boolean passFilters(final InetAddressAndPort endpoint, boolean localDCCheck)
+ private boolean passFilters(final Replica replica, boolean localDCCheck)
{
- for (RangeStreamer.ISourceFilter filter : sourceFilters)
- {
- if (!filter.shouldInclude(endpoint))
- return false;
- }
-
- return !localDCCheck || isInLocalDC(endpoint);
+ return sourceFilters.apply(replica) && (!localDCCheck || isInLocalDC(replica));
}
private static abstract class Vertex
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org