You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2021/11/30 16:07:00 UTC
[cassandra] 02/03: [CEP-10] Cluster and Code Simulations: Minor improvements
This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit eae581a5f07c340594f6af47bb558693ef363611
Author: Benedict Elliott Smith <be...@apple.com>
AuthorDate: Wed Nov 17 14:34:23 2021 +0000
[CEP-10] Cluster and Code Simulations: Minor improvements
- Simplify Semaphore
- Future improvements
- ScheduledExecutorPlus improvements for simulator compatibility
- Debug leaks in Ref or BufferPool
- Support use of TokenMetadata without initialising Cassandra
- Additional system properties and simulator flags
- Permit Clock initialisation within separate ClassLoader
- Introduce BallotGenerator
patch by Benedict; reviewed by Sam Tunnicliffe for CASSANDRA-17008
---
.../cassandra/auth/CassandraRoleManager.java | 2 +-
.../concurrent/ScheduledExecutorPlus.java | 24 +++
.../ScheduledThreadPoolExecutorPlus.java | 27 +++
.../cassandra/concurrent/SyncFutureTask.java | 5 +-
.../config/CassandraRelevantProperties.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 4 +
.../cql3/statements/BatchUpdatesCollector.java | 2 +-
src/java/org/apache/cassandra/db/Keyspace.java | 2 +-
src/java/org/apache/cassandra/db/Mutation.java | 2 +-
src/java/org/apache/cassandra/db/ReadCommand.java | 2 +-
.../cassandra/db/ReadExecutionController.java | 2 +-
.../db/commitlog/AbstractCommitLogService.java | 3 +-
.../cassandra/db/monitoring/MonitorableImpl.java | 2 +-
.../cassandra/db/monitoring/MonitoringTask.java | 2 +-
.../org/apache/cassandra/gms/FailureDetector.java | 4 +-
.../apache/cassandra/hints/HintsDispatcher.java | 2 +-
.../apache/cassandra/locator/TokenMetadata.java | 76 ++++----
src/java/org/apache/cassandra/metrics/Sampler.java | 2 +-
.../cassandra/net/AbstractMessageHandler.java | 2 +-
.../cassandra/net/InboundMessageHandler.java | 2 +-
.../cassandra/net/InboundMessageHandlers.java | 2 +-
src/java/org/apache/cassandra/net/Message.java | 2 +-
.../apache/cassandra/net/OutboundConnection.java | 2 +-
.../org/apache/cassandra/net/RequestCallbacks.java | 2 +-
.../apache/cassandra/net/ResponseVerbHandler.java | 2 +-
.../apache/cassandra/schema/MigrationManager.java | 4 +
.../apache/cassandra/schema/SchemaKeyspace.java | 3 +
.../cassandra/service/ActiveRepairService.java | 3 +
.../org/apache/cassandra/service/StorageProxy.java | 28 ++-
.../cassandra/service/paxos/BallotGenerator.java | 75 ++++++++
.../cassandra/service/paxos/ProposeCallback.java | 3 +-
.../cassandra/transport/CQLMessageHandler.java | 2 +-
src/java/org/apache/cassandra/utils/Clock.java | 31 +++-
.../org/apache/cassandra/utils/MonotonicClock.java | 19 +-
.../Nemesis.java} | 27 ++-
src/java/org/apache/cassandra/utils/Simulate.java | 56 ++++++
.../cassandra/utils/concurrent/AbstractFuture.java | 26 ++-
.../cassandra/utils/concurrent/AsyncFuture.java | 2 +-
.../cassandra/utils/concurrent/Awaitable.java | 135 +++++++-------
.../apache/cassandra/utils/concurrent/Future.java | 11 ++
.../cassandra/utils/concurrent/ListenerList.java | 18 +-
.../org/apache/cassandra/utils/concurrent/Ref.java | 29 ++-
.../cassandra/utils/concurrent/Semaphore.java | 200 ++-------------------
.../cassandra/utils/concurrent/SyncFuture.java | 4 +-
.../apache/cassandra/utils/memory/BufferPool.java | 19 +-
.../apache/cassandra/utils/memory/HeapPool.java | 4 +
.../cassandra/utils/memory/LongBufferPoolTest.java | 4 +-
.../concurrent/AbstractExecutorPlusTest.java | 8 +-
.../cassandra/utils/concurrent/SemaphoreTest.java | 8 +-
49 files changed, 515 insertions(+), 382 deletions(-)
diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
index 0e49056..b813b55 100644
--- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
+++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
@@ -385,7 +385,7 @@ public class CassandraRoleManager implements IRoleManager
protected void scheduleSetupTask(final Callable<Void> setupTask)
{
// The delay is to give the node a chance to see its peers before attempting the operation
- ScheduledExecutors.optionalTasks.schedule(() -> {
+ ScheduledExecutors.optionalTasks.scheduleSelfRecurring(() -> {
isClusterReady = true;
try
{
diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java b/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java
index ecf073d..a2b033a 100644
--- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java
@@ -19,6 +19,8 @@
package org.apache.cassandra.concurrent;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.cassandra.utils.Shared;
@@ -27,4 +29,26 @@ import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
@Shared(scope = SIMULATION)
public interface ScheduledExecutorPlus extends ExecutorPlus, ScheduledExecutorService
{
+ /**
+ * Schedule an action that is recurring but self-administered.
+ */
+ ScheduledFuture<?> scheduleSelfRecurring(Runnable run, long delay, TimeUnit units);
+
+ /**
+ * Schedule a timeout action. This method is primarily used by the Simulator to modify its
+ * scheduling behaviour with respect to this operation.
+ */
+ ScheduledFuture<?> scheduleAt(Runnable run, long deadline);
+
+ /**
+ * Schedule a timeout action. This method is primarily used by the Simulator to modify its
+ * scheduling behaviour with respect to this operation.
+ */
+ ScheduledFuture<?> scheduleTimeoutAt(Runnable run, long deadline);
+
+ /**
+ * Schedule a timeout action. This method is primarily used by the Simulator to modify its
+ * scheduling behaviour with respect to this operation.
+ */
+ ScheduledFuture<?> scheduleTimeoutWithDelay(Runnable run, long delay, TimeUnit units);
}
diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledThreadPoolExecutorPlus.java b/src/java/org/apache/cassandra/concurrent/ScheduledThreadPoolExecutorPlus.java
index efd284f..0ab09a4 100644
--- a/src/java/org/apache/cassandra/concurrent/ScheduledThreadPoolExecutorPlus.java
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledThreadPoolExecutorPlus.java
@@ -28,8 +28,11 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.service.StorageService;
+import static com.google.common.primitives.Longs.max;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.concurrent.ExecutionFailure.propagating;
import static org.apache.cassandra.concurrent.ExecutionFailure.suppressing;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
/**
* Like ExecutorPlus, ScheduledThreadPoolExecutorPlus always
@@ -97,6 +100,30 @@ public class ScheduledThreadPoolExecutorPlus extends ScheduledThreadPoolExecutor
return super.scheduleWithFixedDelay(suppressing(task), initialDelay, delay, unit);
}
+ @Override
+ public ScheduledFuture<?> scheduleSelfRecurring(Runnable run, long delay, TimeUnit units)
+ {
+ return schedule(run, delay, units);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAt(Runnable run, long deadline)
+ {
+ return schedule(run, max(0, deadline - nanoTime()), NANOSECONDS);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleTimeoutAt(Runnable run, long deadline)
+ {
+ return scheduleTimeoutWithDelay(run, max(0, deadline - nanoTime()), NANOSECONDS);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleTimeoutWithDelay(Runnable run, long delay, TimeUnit units)
+ {
+ return schedule(run, delay, units);
+ }
+
/*======== BEGIN DIRECT COPY OF ThreadPoolExecutorPlus ===============*/
private <T extends Runnable> T addTask(T task)
diff --git a/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java b/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java
index 4f4aa67..4885821 100644
--- a/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java
+++ b/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java
@@ -60,7 +60,10 @@ public class SyncFutureTask<T> extends SyncFuture<T> implements RunnableFuture<T
try
{
if (!setUncancellable())
- throw new IllegalStateException();
+ {
+ if (isCancelled()) return;
+ else throw new IllegalStateException();
+ }
if (!trySuccess(call.call()))
throw new IllegalStateException();
diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 43db1c3..807516c 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -228,6 +228,7 @@ public enum CassandraRelevantProperties
// properties for debugging simulator ASM output
TEST_SIMULATOR_PRINT_ASM("cassandra.test.simulator.print_asm", "none"),
TEST_SIMULATOR_PRINT_ASM_TYPES("cassandra.test.simulator.print_asm_types", ""),
+ TEST_SIMULATOR_LIVENESS_CHECK("cassandra.test.simulator.livenesscheck", "true"),
// determinism properties for testing
DETERMINISM_SSTABLE_COMPRESSION_DEFAULT("cassandra.sstable_compression_default", "true"),
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 8bb52d4..8265fd2 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -70,6 +70,7 @@ import org.apache.cassandra.security.EncryptionContext;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.CacheService.CacheType;
import org.apache.cassandra.service.paxos.Paxos;
+import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.commons.lang3.ArrayUtils;
@@ -80,6 +81,7 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.OS_ARCH;
import static org.apache.cassandra.config.CassandraRelevantProperties.SUN_ARCH_DATA_MODEL;
import static org.apache.cassandra.io.util.FileUtils.ONE_GB;
import static org.apache.cassandra.io.util.FileUtils.ONE_MB;
+import static org.apache.cassandra.utils.Clock.Global.logInitializationOutcome;
public class DatabaseDescriptor
{
@@ -839,6 +841,8 @@ public class DatabaseDescriptor
}
Paxos.setPaxosVariant(conf.paxos_variant);
+
+ logInitializationOutcome(logger);
}
@VisibleForTesting
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
index cb88bdd..e5136f4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
/**
* Utility class to collect updates.
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 795230e..285d08c 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -78,7 +78,7 @@ import org.apache.cassandra.utils.concurrent.Promise;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
/**
* It represents a Keyspace.
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index a30b567..6350082 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -42,7 +42,7 @@ import org.apache.cassandra.utils.concurrent.Future;
import static org.apache.cassandra.net.MessagingService.VERSION_30;
import static org.apache.cassandra.net.MessagingService.VERSION_3014;
import static org.apache.cassandra.net.MessagingService.VERSION_40;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
public class Mutation implements IMutation
{
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index f14240b..3bf0d6d 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -72,7 +72,7 @@ import org.apache.cassandra.utils.ObjectSizes;
import static com.google.common.collect.Iterables.any;
import static com.google.common.collect.Iterables.filter;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP;
/**
diff --git a/src/java/org/apache/cassandra/db/ReadExecutionController.java b/src/java/org/apache/cassandra/db/ReadExecutionController.java
index 5bcd84b..2fbe3ac 100644
--- a/src/java/org/apache/cassandra/db/ReadExecutionController.java
+++ b/src/java/org/apache/cassandra/db/ReadExecutionController.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.concurrent.OpOrder;
-import static org.apache.cassandra.utils.MonotonicClock.preciseTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime;
public class ReadExecutionController implements AutoCloseable
{
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index be3f8cd..6b5378f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -45,6 +45,7 @@ import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL;
import static org.apache.cassandra.concurrent.Interruptible.State.SHUTTING_DOWN;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime;
import static org.apache.cassandra.utils.concurrent.Semaphore.newSemaphore;
import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
@@ -151,7 +152,7 @@ public abstract class AbstractCommitLogService
throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
syncIntervalNanos * 1e-6));
- SyncRunnable sync = new SyncRunnable(MonotonicClock.preciseTime);
+ SyncRunnable sync = new SyncRunnable(preciseTime);
executor = executorFactory().infiniteLoop(name, sync, SAFE, NON_DAEMON, SYNCHRONIZED);
}
diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
index a6e7947..31b5404 100644
--- a/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
+++ b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
@@ -18,7 +18,7 @@
package org.apache.cassandra.db.monitoring;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
public abstract class MonitorableImpl implements Monitorable
{
diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
index 52d6160..d681e4b 100644
--- a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
+++ b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.utils.NoSpamLogger;
import static java.lang.System.getProperty;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue;
/**
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index 40a2de5..6db41de 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -28,7 +28,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import javax.management.openmbean.*;
-import org.apache.cassandra.io.util.File;
+
import org.apache.cassandra.locator.Replica;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +41,7 @@ import org.apache.cassandra.utils.MBeanWrapper;
import static org.apache.cassandra.config.CassandraRelevantProperties.LINE_SEPARATOR;
import static org.apache.cassandra.config.DatabaseDescriptor.newFailureDetector;
-import static org.apache.cassandra.utils.MonotonicClock.preciseTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime;
/**
* This FailureDetector is an implementation of the paper titled
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index 2b6d9a3..b627338 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.utils.concurrent.Condition;
import static org.apache.cassandra.hints.HintsDispatcher.Callback.Outcome.*;
import static org.apache.cassandra.metrics.HintsServiceMetrics.updateDelayMetrics;
import static org.apache.cassandra.net.Verb.HINT_REQ;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;
/**
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index fb9d43b..202bd6a 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -862,42 +862,8 @@ public class TokenMetadata
{
TokenMetadataDiagnostics.pendingRangeCalculationStarted(this, keyspaceName);
- // create clone of current state
- BiMultiValMap<Token, InetAddressAndPort> bootstrapTokensClone;
- Set<InetAddressAndPort> leavingEndpointsClone;
- Set<Pair<Token, InetAddressAndPort>> movingEndpointsClone;
- TokenMetadata metadata;
-
- lock.readLock().lock();
- try
- {
-
- if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
- {
- if (logger.isTraceEnabled())
- logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
- if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
- {
- if (logger.isTraceEnabled())
- logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
- pendingRanges.put(keyspaceName, new PendingRangeMaps());
-
- return;
- }
- }
+ unsafeCalculatePendingRanges(strategy, keyspaceName);
- bootstrapTokensClone = new BiMultiValMap<>(this.bootstrapTokens);
- leavingEndpointsClone = new HashSet<>(this.leavingEndpoints);
- movingEndpointsClone = new HashSet<>(this.movingEndpoints);
- metadata = this.cloneOnlyTokenMap();
- }
- finally
- {
- lock.readLock().unlock();
- }
-
- pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokensClone,
- leavingEndpointsClone, movingEndpointsClone));
if (logger.isDebugEnabled())
logger.debug("Starting pending range calculation for {}", keyspaceName);
@@ -910,6 +876,46 @@ public class TokenMetadata
}
}
+ public void unsafeCalculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName)
+ {
+ // create clone of current state
+ BiMultiValMap<Token, InetAddressAndPort> bootstrapTokensClone;
+ Set<InetAddressAndPort> leavingEndpointsClone;
+ Set<Pair<Token, InetAddressAndPort>> movingEndpointsClone;
+ TokenMetadata metadata;
+
+ lock.readLock().lock();
+ try
+ {
+
+ if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
+ if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
+ pendingRanges.put(keyspaceName, new PendingRangeMaps());
+
+ return;
+ }
+ }
+
+ bootstrapTokensClone = new BiMultiValMap<>(this.bootstrapTokens);
+ leavingEndpointsClone = new HashSet<>(this.leavingEndpoints);
+ movingEndpointsClone = new HashSet<>(this.movingEndpoints);
+ metadata = this.cloneOnlyTokenMap();
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+
+ pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokensClone,
+ leavingEndpointsClone, movingEndpointsClone));
+ }
+
/**
* @see TokenMetadata#calculatePendingRanges(AbstractReplicationStrategy, String)
*/
diff --git a/src/java/org/apache/cassandra/metrics/Sampler.java b/src/java/org/apache/cassandra/metrics/Sampler.java
index 90cc90c..b3d0f21 100644
--- a/src/java/org/apache/cassandra/metrics/Sampler.java
+++ b/src/java/org/apache/cassandra/metrics/Sampler.java
@@ -40,7 +40,7 @@ public abstract class Sampler<T>
}
@VisibleForTesting
- MonotonicClock clock = MonotonicClock.approxTime;
+ MonotonicClock clock = MonotonicClock.Global.approxTime;
@VisibleForTesting
static final ExecutorPlus samplerExecutor = executorFactory()
diff --git a/src/java/org/apache/cassandra/net/AbstractMessageHandler.java b/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
index 1045f28..e2cf68d 100644
--- a/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
+++ b/src/java/org/apache/cassandra/net/AbstractMessageHandler.java
@@ -44,7 +44,7 @@ import org.apache.cassandra.net.ResourceLimits.Limit;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.apache.cassandra.net.Crc.InvalidCrc;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
/**
* Core logic for handling inbound message deserialization and execution (in tandem with {@link FrameDecoder}).
diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandler.java b/src/java/org/apache/cassandra/net/InboundMessageHandler.java
index c1b51be..e12fcec 100644
--- a/src/java/org/apache/cassandra/net/InboundMessageHandler.java
+++ b/src/java/org/apache/cassandra/net/InboundMessageHandler.java
@@ -43,7 +43,7 @@ import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.NoSpamLogger;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
/**
* Implementation of {@link AbstractMessageHandler} for processing internode messages from peers.
diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandlers.java b/src/java/org/apache/cassandra/net/InboundMessageHandlers.java
index a706557..c7b9463 100644
--- a/src/java/org/apache/cassandra/net/InboundMessageHandlers.java
+++ b/src/java/org/apache/cassandra/net/InboundMessageHandlers.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.metrics.InternodeInboundMetrics;
import org.apache.cassandra.net.Message.Header;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
/**
* An aggregation of {@link InboundMessageHandler}s for all connections from a peer.
diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java
index 802c79f..8fe8971 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -57,7 +57,7 @@ import static org.apache.cassandra.net.MessagingService.VERSION_3014;
import static org.apache.cassandra.net.MessagingService.VERSION_30;
import static org.apache.cassandra.net.MessagingService.VERSION_40;
import static org.apache.cassandra.net.MessagingService.instance;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
import static org.apache.cassandra.utils.vint.VIntCoding.computeUnsignedVIntSize;
import static org.apache.cassandra.utils.vint.VIntCoding.getUnsignedVInt;
import static org.apache.cassandra.utils.vint.VIntCoding.skipUnsignedVInt;
diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java
index fbf0c73..c2aecb0 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnection.java
@@ -68,7 +68,7 @@ import static org.apache.cassandra.net.ResourceLimits.*;
import static org.apache.cassandra.net.ResourceLimits.Outcome.*;
import static org.apache.cassandra.net.SocketFactory.*;
import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
import static org.apache.cassandra.utils.Throwables.isCausedBy;
import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java b/src/java/org/apache/cassandra/net/RequestCallbacks.java
index 6275c15..fa1a03e 100644
--- a/src/java/org/apache/cassandra/net/RequestCallbacks.java
+++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java
@@ -49,7 +49,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.concurrent.Stage.INTERNAL_RESPONSE;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
-import static org.apache.cassandra.utils.MonotonicClock.preciseTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime;
/**
* An expiring map of request callbacks.
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 369e5f4..1cee468 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.tracing.Tracing;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
class ResponseVerbHandler implements IVerbHandler
{
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java
index 6fbfc5d..8e485e0 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -23,6 +23,8 @@ import java.lang.management.ManagementFactory;
import java.util.function.LongSupplier;
import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.utils.Simulate;
import org.apache.cassandra.utils.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +44,9 @@ import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.concurrent.Stage.MIGRATION;
import static org.apache.cassandra.net.Verb.SCHEMA_PUSH_REQ;
+import static org.apache.cassandra.utils.Simulate.With.GLOBAL_CLOCK;
+@Simulate(with = GLOBAL_CLOCK)
public class MigrationManager
{
private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class);
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 6d5e331..90859ce 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Simulate;
import static java.lang.String.format;
@@ -58,6 +59,7 @@ import static java.util.stream.Collectors.toSet;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
import static org.apache.cassandra.schema.SchemaKeyspaceTables.*;
+import static org.apache.cassandra.utils.Simulate.With.GLOBAL_CLOCK;
/**
* system_schema.* tables and methods for manipulating them.
@@ -294,6 +296,7 @@ public final class SchemaKeyspace
/**
* Add entries to system_schema.* for the hardcoded system keyspaces
*/
+ @Simulate(with = GLOBAL_CLOCK)
static void saveSystemKeyspacesSchema()
{
KeyspaceMetadata system = Schema.instance.getKeyspaceMetadata(SchemaConstants.SYSTEM_KEYSPACE_NAME);
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 322fd18..7d0a290 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.config.Config;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.locator.EndpointsByRange;
import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.utils.Simulate;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,6 +107,7 @@ import static org.apache.cassandra.config.DatabaseDescriptor.*;
import static org.apache.cassandra.net.Message.out;
import static org.apache.cassandra.net.Verb.PREPARE_MSG;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+import static org.apache.cassandra.utils.Simulate.With.MONITORS;
import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
/**
@@ -122,6 +124,7 @@ import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownL
* The creation of a repair session is done through the submitRepairSession that
* returns a future on the completion of that session.
*/
+@Simulate(with = MONITORS)
public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener, ActiveRepairServiceMBean
{
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 62bd01a..3883062 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -141,6 +141,7 @@ import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static com.google.common.collect.Iterables.concat;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.cassandra.db.ConsistencyLevel.SERIAL;
import static org.apache.cassandra.net.Message.out;
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casReadMetrics;
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casWriteMetrics;
@@ -152,6 +153,8 @@ import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetr
import static org.apache.cassandra.net.NoPayload.noPayload;
import static org.apache.cassandra.net.Verb.*;
import static org.apache.cassandra.service.BatchlogResponseHandler.BatchlogCleanup;
+import static org.apache.cassandra.service.paxos.BallotGenerator.Global.nextBallotTimestampMicros;
+import static org.apache.cassandra.service.paxos.BallotGenerator.Global.randomBallot;
import static org.apache.cassandra.service.paxos.PrepareVerbHandler.doPrepare;
import static org.apache.cassandra.service.paxos.ProposeVerbHandler.doPropose;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
@@ -335,7 +338,6 @@ public class StorageProxy implements StorageProxyMBean
consistencyForPaxos,
consistencyForCommit,
consistencyForCommit,
- state,
queryStartNanoTime,
casWriteMetrics,
updateProposer);
@@ -430,7 +432,6 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel consistencyForPaxos,
ConsistencyLevel consistencyForReplayCommits,
ConsistencyLevel consistencyForCommit,
- ClientState state,
long queryStartNanoTime,
CASClientRequestMetrics casMetrics,
Supplier<Pair<PartitionUpdate, RowIterator>> createUpdateProposal)
@@ -457,8 +458,7 @@ public class StorageProxy implements StorageProxyMBean
replicaPlan,
consistencyForPaxos,
consistencyForReplayCommits,
- casMetrics,
- state);
+ casMetrics);
final UUID ballot = pair.ballot;
contentions += pair.contentions;
@@ -525,8 +525,7 @@ public class StorageProxy implements StorageProxyMBean
ReplicaPlan.ForPaxosWrite paxosPlan,
ConsistencyLevel consistencyForPaxos,
ConsistencyLevel consistencyForCommit,
- CASClientRequestMetrics casMetrics,
- ClientState state)
+ CASClientRequestMetrics casMetrics)
throws WriteTimeoutException, WriteFailureException
{
long timeoutNanos = DatabaseDescriptor.getCasContentionTimeout(NANOSECONDS);
@@ -540,10 +539,10 @@ public class StorageProxy implements StorageProxyMBean
// in progress (#5667). Lastly, we don't want to use a timestamp that is older than the last one assigned by ClientState or operations may appear
// out-of-order (#7801).
long minTimestampMicrosToUse = summary == null ? Long.MIN_VALUE : 1 + UUIDGen.microsTimestamp(summary.mostRecentInProgressCommit.ballot);
- long ballotMicros = state.getTimestampForPaxos(minTimestampMicrosToUse);
+ long ballotMicros = nextBallotTimestampMicros(minTimestampMicrosToUse);
// Note that ballotMicros is not guaranteed to be unique if two proposal are being handled concurrently by the same coordinator. But we still
// need ballots to be unique for each proposal so we have to use getRandomTimeUUIDFromMicros.
- UUID ballot = UUIDGen.getRandomTimeUUIDFromMicros(ballotMicros);
+ UUID ballot = randomBallot(ballotMicros, consistencyForPaxos == SERIAL);
// prepare
try
@@ -1810,7 +1809,6 @@ public class StorageProxy implements StorageProxyMBean
consistencyLevel,
consistencyForReplayCommitsOrFetch,
ConsistencyLevel.ANY,
- state,
start,
casReadMetrics,
updateProposer);
@@ -2077,12 +2075,12 @@ public class StorageProxy implements StorageProxyMBean
}
else
{
- MessagingService.instance().metrics.recordSelfDroppedMessage(verb, MonotonicClock.approxTime.now() - approxCreationTimeNanos, NANOSECONDS);
+ MessagingService.instance().metrics.recordSelfDroppedMessage(verb, MonotonicClock.Global.approxTime.now() - approxCreationTimeNanos, NANOSECONDS);
handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.UNKNOWN);
}
if (!readRejected)
- MessagingService.instance().latencySubscribers.add(FBUtilities.getBroadcastAddressAndPort(), MonotonicClock.approxTime.now() - approxCreationTimeNanos, NANOSECONDS);
+ MessagingService.instance().latencySubscribers.add(FBUtilities.getBroadcastAddressAndPort(), MonotonicClock.Global.approxTime.now() - approxCreationTimeNanos, NANOSECONDS);
}
catch (Throwable t)
{
@@ -2375,13 +2373,13 @@ public class StorageProxy implements StorageProxyMBean
public DroppableRunnable(Verb verb)
{
- this.approxCreationTimeNanos = MonotonicClock.approxTime.now();
+ this.approxCreationTimeNanos = MonotonicClock.Global.approxTime.now();
this.verb = verb;
}
public final void run()
{
- long approxCurrentTimeNanos = MonotonicClock.approxTime.now();
+ long approxCurrentTimeNanos = MonotonicClock.Global.approxTime.now();
long expirationTimeNanos = verb.expiresAtNanos(approxCreationTimeNanos);
if (approxCurrentTimeNanos > expirationTimeNanos)
{
@@ -2408,7 +2406,7 @@ public class StorageProxy implements StorageProxyMBean
*/
private static abstract class LocalMutationRunnable implements Runnable
{
- private final long approxCreationTimeNanos = MonotonicClock.approxTime.now();
+ private final long approxCreationTimeNanos = MonotonicClock.Global.approxTime.now();
private final Replica localReplica;
@@ -2420,7 +2418,7 @@ public class StorageProxy implements StorageProxyMBean
public final void run()
{
final Verb verb = verb();
- long nowNanos = MonotonicClock.approxTime.now();
+ long nowNanos = MonotonicClock.Global.approxTime.now();
long expirationTimeNanos = verb.expiresAtNanos(approxCreationTimeNanos);
if (nowNanos > expirationTimeNanos)
{
diff --git a/src/java/org/apache/cassandra/service/paxos/BallotGenerator.java b/src/java/org/apache/cassandra/service/paxos/BallotGenerator.java
new file mode 100644
index 0000000..d031f38
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/paxos/BallotGenerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.paxos;
+
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.utils.Shared;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION)
+public interface BallotGenerator
+{
+ static class Default implements BallotGenerator
+ {
+ public UUID randomBallot(long whenInMicros, boolean isSerial)
+ {
+ return UUIDGen.getRandomTimeUUIDFromMicros(whenInMicros, isSerial ? 2 : 1);
+ }
+
+ public UUID randomBallot(long fromInMicros, long toInMicros, boolean isSerial)
+ {
+ long timestampMicros = ThreadLocalRandom.current().nextLong(fromInMicros, toInMicros);
+ return randomBallot(timestampMicros, isSerial);
+ }
+
+ public long nextBallotTimestampMicros(long minTimestamp)
+ {
+ return ClientState.getTimestampForPaxos(minTimestamp);
+ }
+
+ public long prevBallotTimestampMicros()
+ {
+ return ClientState.getLastTimestampMicros();
+ }
+ }
+
+ static class Global
+ {
+ private static BallotGenerator instance = new Default();
+ public static UUID randomBallot(long whenInMicros, boolean isSerial) { return instance.randomBallot(whenInMicros, isSerial); }
+ public static UUID randomBallot(long fromInMicros, long toInMicros, boolean isSerial) { return instance.randomBallot(fromInMicros, toInMicros, isSerial); }
+ public static long nextBallotTimestampMicros(long minWhenInMicros) { return instance.nextBallotTimestampMicros(minWhenInMicros); }
+ public static long prevBallotTimestampMicros() { return instance.prevBallotTimestampMicros(); }
+
+ public static void unsafeSet(BallotGenerator newInstance)
+ {
+ instance = newInstance;
+ }
+ }
+
+ UUID randomBallot(long whenInMicros, boolean isSerial);
+ UUID randomBallot(long fromInMicros, long toInMicros, boolean isSerial);
+ long nextBallotTimestampMicros(long minWhenInMicros);
+ long prevBallotTimestampMicros();
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
index dc2f9a7..64eca68 100644
--- a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.Nemesis;
/**
* ProposeCallback has two modes of operation, controlled by the failFast parameter.
@@ -46,7 +47,7 @@ public class ProposeCallback extends AbstractPaxosCallback<Boolean>
{
private static final Logger logger = LoggerFactory.getLogger(ProposeCallback.class);
- private final AtomicInteger accepts = new AtomicInteger(0);
+ @Nemesis private final AtomicInteger accepts = new AtomicInteger(0);
private final int requiredAccepts;
private final boolean failFast;
diff --git a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
index bbb8cb5..09e9996 100644
--- a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
+++ b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
@@ -46,7 +46,7 @@ import org.apache.cassandra.transport.Flusher.FlushItem.Framed;
import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.utils.NoSpamLogger;
-import static org.apache.cassandra.utils.MonotonicClock.approxTime;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
/**
* Implementation of {@link AbstractMessageHandler} for processing CQL messages which comprise a {@link Message} wrapped
diff --git a/src/java/org/apache/cassandra/utils/Clock.java b/src/java/org/apache/cassandra/utils/Clock.java
index d1a7337..acdfc82 100644
--- a/src/java/org/apache/cassandra/utils/Clock.java
+++ b/src/java/org/apache/cassandra/utils/Clock.java
@@ -37,10 +37,12 @@ import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
@Shared(scope = SIMULATION)
public interface Clock
{
- static final Logger logger = LoggerFactory.getLogger(Clock.class);
-
public static class Global
{
+ // something weird happens with class loading Logger that can cause a deadlock
+ private static Throwable FAILED_TO_INITIALISE;
+ private static String INITIALIZE_MESSAGE;
+
/**
* Static singleton object that will be instantiated by default with a system clock
* implementation. Set <code>cassandra.clock</code> system property to a FQCN to use a
@@ -52,19 +54,38 @@ public interface Clock
{
String classname = CLOCK_GLOBAL.getString();
Clock clock = new Default();
+ Throwable errorOutcome = null;
+ String outcome = null;
if (classname != null)
{
try
{
- logger.debug("Using custom clock implementation: {}", classname);
+ outcome = "Using custom clock implementation: " + classname;
clock = (Clock) Class.forName(classname).newInstance();
}
- catch (Exception e)
+ catch (Throwable t)
{
- logger.error("Failed to load clock implementation {}", classname, e);
+ outcome = "Failed to load clock implementation " + classname;
+ errorOutcome = t;
}
}
instance = clock;
+ FAILED_TO_INITIALISE = errorOutcome;
+ INITIALIZE_MESSAGE = outcome;
+ }
+
+ public static void logInitializationOutcome(Logger logger)
+ {
+ if (FAILED_TO_INITIALISE != null)
+ {
+ logger.error(INITIALIZE_MESSAGE, FAILED_TO_INITIALISE);
+ }
+ else if (INITIALIZE_MESSAGE != null)
+ {
+ logger.debug(INITIALIZE_MESSAGE);
+ }
+ FAILED_TO_INITIALISE = null;
+ INITIALIZE_MESSAGE = null;
}
/**
diff --git a/src/java/org/apache/cassandra/utils/MonotonicClock.java b/src/java/org/apache/cassandra/utils/MonotonicClock.java
index e14fd45..d4590c9 100644
--- a/src/java/org/apache/cassandra/utils/MonotonicClock.java
+++ b/src/java/org/apache/cassandra/utils/MonotonicClock.java
@@ -39,7 +39,7 @@ import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
* Wrapper around time related functions that are either implemented by using the default JVM calls
* or by using a custom implementation for testing purposes.
*
- * See {@link #preciseTime} for how to use a custom implementation.
+ * See {@link Global#preciseTime} for how to use a custom implementation.
*
* Please note that {@link java.time.Clock} wasn't used, as it would not be possible to provide an
* implementation for {@link #now()} with the exact same properties of {@link System#nanoTime()}.
@@ -49,13 +49,6 @@ import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
@Shared(scope = SIMULATION)
public interface MonotonicClock
{
- /**
- * Static singleton object that will be instantiated by default with a system clock
- * implementation. Set <code>cassandra.clock</code> system property to a FQCN to use a
- * different implementation instead.
- */
- public static final MonotonicClock preciseTime = Defaults.precise();
- public static final MonotonicClock approxTime = Defaults.approx(preciseTime);
/**
* @see System#nanoTime()
@@ -77,10 +70,18 @@ public interface MonotonicClock
public boolean isAfter(long instant);
public boolean isAfter(long now, long instant);
- static class Defaults
+ public static class Global
{
private static final Logger logger = LoggerFactory.getLogger(MonotonicClock.class);
+ /**
+ * Static singleton object that will be instantiated by default with a system clock
+ * implementation. Set <code>cassandra.clock</code> system property to a FQCN to use a
+ * different implementation instead.
+ */
+ public static final MonotonicClock preciseTime = precise();
+ public static final MonotonicClock approxTime = approx(preciseTime);
+
private static MonotonicClock precise()
{
String sclock = CLOCK_MONOTONIC_PRECISE.getString();
diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java b/src/java/org/apache/cassandra/utils/Nemesis.java
similarity index 50%
copy from src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java
copy to src/java/org/apache/cassandra/utils/Nemesis.java
index ecf073d..b5110c4 100644
--- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java
+++ b/src/java/org/apache/cassandra/utils/Nemesis.java
@@ -16,15 +16,28 @@
* limitations under the License.
*/
-package org.apache.cassandra.concurrent;
+package org.apache.cassandra.utils;
-import java.util.concurrent.ScheduledExecutorService;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
-import org.apache.cassandra.utils.Shared;
+import static org.apache.cassandra.utils.Nemesis.Traffic.HIGH;
-import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
-
-@Shared(scope = SIMULATION)
-public interface ScheduledExecutorPlus extends ExecutorPlus, ScheduledExecutorService
+/**
+ * Annotate fields, particularly important volatile fields, where the system should adversarially schedule
+ * thread events around memory accesses (read or write).
+ *
+ * This can introduce significant simulation overhead, so should be used sparingly.
+ *
+ * TODO: Support @Nemesis on methods, to insert nemesis points either before or after invocations of the method
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ ElementType.FIELD })
+public @interface Nemesis
{
+ enum Traffic { LOW, HIGH }
+
+ Traffic traffic() default HIGH;
}
diff --git a/src/java/org/apache/cassandra/utils/Simulate.java b/src/java/org/apache/cassandra/utils/Simulate.java
new file mode 100644
index 0000000..dd0d230
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/Simulate.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+/**
+ * Enable certain features for a specific method or class.
+ *
+ * Note that presently class level annotations are not inherited by inner classes.
+ *
+ * TODO: support package level, and apply to all nested classes
+ */
+public @interface Simulate
+{
+ enum With
+ {
+ /**
+ * Calls to FBUtilities.timestampMicros() will be guaranteed globally monotonically increasing.
+ *
+ * May be annotated at the method or class level.
+ */
+ GLOBAL_CLOCK,
+
+ /**
+ * synchronized methods and blocks, and wait/notify.
+ *
+ * May be annotated at the class level.
+ */
+ MONITORS,
+
+ /**
+ * Usages of LockSupport. This defaults to ON for all classes, including system classes.
+ *
+ * May be annotated at the method or class level.
+ */
+ LOCK_SUPPORT
+ }
+
+ With[] with() default {};
+ With[] without() default {};
+}
diff --git a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
index 86e3c12..026f9f2 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/AbstractFuture.java
@@ -316,6 +316,18 @@ public abstract class AbstractFuture<V> implements Future<V>
return this;
}
+
+ /**
+ * Support {@link com.google.common.util.concurrent.Futures#transformAsync(ListenableFuture, AsyncFunction, Executor)} natively
+ *
+ * See {@link #addListener(GenericFutureListener)} for ordering semantics.
+ */
+ @Override
+ public <T> Future<T> map(Function<? super V, ? extends T> mapper)
+ {
+ return map(mapper, null);
+ }
+
/**
* Support more fluid version of {@link com.google.common.util.concurrent.Futures#addCallback}
*
@@ -435,31 +447,31 @@ public abstract class AbstractFuture<V> implements Future<V>
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
{
- return Awaitable.await(this, timeout, unit);
+ return Defaults.await(this, timeout, unit);
}
@Override
public boolean awaitThrowUncheckedOnInterrupt(long time, TimeUnit units) throws UncheckedInterruptedException
{
- return Awaitable.awaitThrowUncheckedOnInterrupt(this, time, units);
+ return Defaults.awaitThrowUncheckedOnInterrupt(this, time, units);
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit)
{
- return Awaitable.awaitUninterruptibly(this, timeout, unit);
+ return Defaults.awaitUninterruptibly(this, timeout, unit);
}
@Override
public boolean awaitUntilThrowUncheckedOnInterrupt(long nanoTimeDeadline) throws UncheckedInterruptedException
{
- return Awaitable.awaitUntilThrowUncheckedOnInterrupt(this, nanoTimeDeadline);
+ return Defaults.awaitUntilThrowUncheckedOnInterrupt(this, nanoTimeDeadline);
}
@Override
public boolean awaitUntilUninterruptibly(long nanoTimeDeadline)
{
- return Awaitable.awaitUntilUninterruptibly(this, nanoTimeDeadline);
+ return Defaults.awaitUntilUninterruptibly(this, nanoTimeDeadline);
}
/**
@@ -468,7 +480,7 @@ public abstract class AbstractFuture<V> implements Future<V>
@Override
public Future<V> awaitUninterruptibly()
{
- return Awaitable.awaitUninterruptibly(this);
+ return Defaults.awaitUninterruptibly(this);
}
/**
@@ -477,7 +489,7 @@ public abstract class AbstractFuture<V> implements Future<V>
@Override
public Future<V> awaitThrowUncheckedOnInterrupt() throws UncheckedInterruptedException
{
- return Awaitable.awaitThrowUncheckedOnInterrupt(this);
+ return Defaults.awaitThrowUncheckedOnInterrupt(this);
}
public String toString()
diff --git a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
index b09eeb7..0ef35d5 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/AsyncFuture.java
@@ -123,7 +123,7 @@ public class AsyncFuture<V> extends AbstractFuture<V>
}
/**
- * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively
+ * Support {@link com.google.common.util.concurrent.Futures#transform} natively
*
* See {@link #addListener(GenericFutureListener)} for ordering semantics.
*/
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java b/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java
index 03aab5f..25bdf02 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java
@@ -105,91 +105,96 @@ public interface Awaitable
*/
Awaitable awaitUninterruptibly();
- public static boolean await(Awaitable await, long time, TimeUnit unit) throws InterruptedException
+ // we must declare the static implementation methods outside of the interface,
+ // so that they can be loaded by different classloaders during simulation
+ class Defaults
{
- return await.awaitUntil(nanoTime() + unit.toNanos(time));
- }
-
- public static boolean awaitThrowUncheckedOnInterrupt(Awaitable await, long time, TimeUnit units) throws UncheckedInterruptedException
- {
- return awaitUntilThrowUncheckedOnInterrupt(await, nanoTime() + units.toNanos(time));
- }
-
- public static boolean awaitUninterruptibly(Awaitable await, long time, TimeUnit units)
- {
- return awaitUntilUninterruptibly(await, nanoTime() + units.toNanos(time));
- }
-
- public static <A extends Awaitable> A awaitThrowUncheckedOnInterrupt(A await) throws UncheckedInterruptedException
- {
- try
- {
- await.await();
- }
- catch (InterruptedException e)
+ public static boolean await(Awaitable await, long time, TimeUnit unit) throws InterruptedException
{
- throw new UncheckedInterruptedException();
+ return await.awaitUntil(nanoTime() + unit.toNanos(time));
}
- return await;
- }
- public static boolean awaitUntilThrowUncheckedOnInterrupt(Awaitable await, long nanoTimeDeadline) throws UncheckedInterruptedException
- {
- try
+ public static boolean awaitThrowUncheckedOnInterrupt(Awaitable await, long time, TimeUnit units) throws UncheckedInterruptedException
{
- return await.awaitUntil(nanoTimeDeadline);
+ return awaitUntilThrowUncheckedOnInterrupt(await, nanoTime() + units.toNanos(time));
}
- catch (InterruptedException e)
+
+ public static boolean awaitUninterruptibly(Awaitable await, long time, TimeUnit units)
{
- throw new UncheckedInterruptedException();
+ return awaitUntilUninterruptibly(await, nanoTime() + units.toNanos(time));
}
- }
- /**
- * {@link Awaitable#awaitUntilUninterruptibly(long)}
- */
- public static boolean awaitUntilUninterruptibly(Awaitable await, long nanoTimeDeadline)
- {
- boolean interrupted = false;
- boolean result;
- while (true)
+ public static <A extends Awaitable> A awaitThrowUncheckedOnInterrupt(A await) throws UncheckedInterruptedException
{
try
{
- result = await.awaitUntil(nanoTimeDeadline);
- break;
+ await.await();
}
catch (InterruptedException e)
{
- interrupted = true;
+ throw new UncheckedInterruptedException();
}
+ return await;
}
- if (interrupted)
- Thread.currentThread().interrupt();
- return result;
- }
- /**
- * {@link Awaitable#awaitUninterruptibly()}
- */
- public static <A extends Awaitable> A awaitUninterruptibly(A await)
- {
- boolean interrupted = false;
- while (true)
+ public static boolean awaitUntilThrowUncheckedOnInterrupt(Awaitable await, long nanoTimeDeadline) throws UncheckedInterruptedException
{
try
{
- await.await();
- break;
+ return await.awaitUntil(nanoTimeDeadline);
}
catch (InterruptedException e)
{
- interrupted = true;
+ throw new UncheckedInterruptedException();
+ }
+ }
+
+ /**
+ * {@link Awaitable#awaitUntilUninterruptibly(long)}
+ */
+ public static boolean awaitUntilUninterruptibly(Awaitable await, long nanoTimeDeadline)
+ {
+ boolean interrupted = false;
+ boolean result;
+ while (true)
+ {
+ try
+ {
+ result = await.awaitUntil(nanoTimeDeadline);
+ break;
+ }
+ catch (InterruptedException e)
+ {
+ interrupted = true;
+ }
+ }
+ if (interrupted)
+ Thread.currentThread().interrupt();
+ return result;
+ }
+
+ /**
+ * {@link Awaitable#awaitUninterruptibly()}
+ */
+ public static <A extends Awaitable> A awaitUninterruptibly(A await)
+ {
+ boolean interrupted = false;
+ while (true)
+ {
+ try
+ {
+ await.await();
+ break;
+ }
+ catch (InterruptedException e)
+ {
+ interrupted = true;
+ }
}
+ if (interrupted)
+ Thread.currentThread().interrupt();
+ return await;
}
- if (interrupted)
- Thread.currentThread().interrupt();
- return await;
}
abstract class AbstractAwaitable implements Awaitable
@@ -202,7 +207,7 @@ public interface Awaitable
@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException
{
- return Awaitable.await(this, time, unit);
+ return Defaults.await(this, time, unit);
}
/**
@@ -211,7 +216,7 @@ public interface Awaitable
@Override
public boolean awaitThrowUncheckedOnInterrupt(long time, TimeUnit units) throws UncheckedInterruptedException
{
- return Awaitable.awaitThrowUncheckedOnInterrupt(this, time, units);
+ return Defaults.awaitThrowUncheckedOnInterrupt(this, time, units);
}
/**
@@ -227,7 +232,7 @@ public interface Awaitable
*/
public Awaitable awaitThrowUncheckedOnInterrupt() throws UncheckedInterruptedException
{
- return Awaitable.awaitThrowUncheckedOnInterrupt(this);
+ return Defaults.awaitThrowUncheckedOnInterrupt(this);
}
/**
@@ -235,7 +240,7 @@ public interface Awaitable
*/
public boolean awaitUntilThrowUncheckedOnInterrupt(long nanoTimeDeadline) throws UncheckedInterruptedException
{
- return Awaitable.awaitUntilThrowUncheckedOnInterrupt(this, nanoTimeDeadline);
+ return Defaults.awaitUntilThrowUncheckedOnInterrupt(this, nanoTimeDeadline);
}
/**
@@ -243,7 +248,7 @@ public interface Awaitable
*/
public boolean awaitUntilUninterruptibly(long nanoTimeDeadline)
{
- return Awaitable.awaitUntilUninterruptibly(this, nanoTimeDeadline);
+ return Defaults.awaitUntilUninterruptibly(this, nanoTimeDeadline);
}
/**
@@ -251,7 +256,7 @@ public interface Awaitable
*/
public Awaitable awaitUninterruptibly()
{
- return Awaitable.awaitUninterruptibly(this);
+ return Defaults.awaitUninterruptibly(this);
}
}
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Future.java b/src/java/org/apache/cassandra/utils/concurrent/Future.java
index 69dc83d..fae5d43 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Future.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Future.java
@@ -96,6 +96,17 @@ public interface Future<V> extends io.netty.util.concurrent.Future<V>, Listenabl
return this;
}
+ /**
+ * waits for completion; in case of failure rethrows the original exception without a new wrapping exception
+ * so may cause problems for reporting stack traces
+ */
+ default Future<V> syncThrowUncheckedOnInterrupt()
+ {
+ awaitThrowUncheckedOnInterrupt();
+ rethrowIfFailed();
+ return this;
+ }
+
@Deprecated
@Override
default boolean await(long l) throws InterruptedException
diff --git a/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java b/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java
index 57737ea..40b908b 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/ListenerList.java
@@ -38,7 +38,7 @@ import static org.apache.cassandra.utils.concurrent.ListenerList.Notifying.NOTIF
/**
* Encapsulate one or more items in a linked-list that is immutable whilst shared, forming a prepend-only list (or stack).
* Once the list is ready to consume, exclusive ownership is taken by clearing the shared variable containing it, after
- * which the list may be invoked using {@link #notify}, which reverses the list before invoking the work it contains.
+ * which the list may be invoked using {@link #notifyExclusive(ListenerList, Future)}, which reverses the list before invoking the work it contains.
*/
abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
{
@@ -93,7 +93,7 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
{
while (true)
{
- notify(listeners, in);
+ notifyExclusive(listeners, in);
if (updater.compareAndSet(in, NOTIFYING, null))
return;
@@ -113,17 +113,13 @@ abstract class ListenerList<V> extends IntrusiveStack<ListenerList<V>>
*
* @param head must be either a {@link ListenerList} or {@link GenericFutureListener}
*/
- static <T> void notify(ListenerList<T> head, Future<T> future)
+ static <T> void notifyExclusive(ListenerList<T> head, Future<T> future)
{
- Executor notifyExecutor = future.notifyExecutor();
- if (inExecutor(notifyExecutor))
- notifyExecutor = null;
-
- notify(head, notifyExecutor, future);
- }
+ Executor notifyExecutor; {
+ Executor exec = future.notifyExecutor();
+ notifyExecutor = inExecutor(exec) ? null : exec;
+ }
- private static <T> void notify(ListenerList<T> head, Executor notifyExecutor, Future<T> future)
- {
head = reverse(head);
forEach(head, i -> i.notifySelf(notifyExecutor, future));
}
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index c077467..f40e08f 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -47,12 +47,15 @@ import org.apache.cassandra.io.util.SafeMemory;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.Shared;
+
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import static java.util.Collections.emptyList;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.UNSAFE;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
import static org.apache.cassandra.utils.Throwables.maybeFail;
import static org.apache.cassandra.utils.Throwables.merge;
@@ -91,6 +94,13 @@ public final class Ref<T> implements RefCounted<T>
{
static final Logger logger = LoggerFactory.getLogger(Ref.class);
public static final boolean DEBUG_ENABLED = System.getProperty("cassandra.debugrefcount", "false").equalsIgnoreCase("true");
+ static OnLeak ON_LEAK;
+
+ @Shared(scope = SIMULATION)
+ public interface OnLeak
+ {
+ void onLeak(Object state);
+ }
final State state;
final T referent;
@@ -227,6 +237,9 @@ public final class Ref<T> implements RefCounted<T>
logger.error("LEAK DETECTED: a reference ({}) to {} was not released before the reference was garbage collected", id, globalState);
if (DEBUG_ENABLED)
debug.log(id);
+ OnLeak onLeak = ON_LEAK;
+ if (onLeak != null)
+ onLeak.onLeak(this);
}
else if (DEBUG_ENABLED)
{
@@ -235,6 +248,12 @@ public final class Ref<T> implements RefCounted<T>
if (fail != null)
logger.error("Error when closing {}", globalState, fail);
}
+
+ @Override
+ public String toString()
+ {
+ return globalState.toString();
+ }
}
static final class Debug
@@ -678,7 +697,10 @@ public final class Ref<T> implements RefCounted<T>
{
final Set<Tidy> candidates = Collections.newSetFromMap(new IdentityHashMap<>());
for (GlobalState state : globallyExtant)
- candidates.add(state.tidy);
+ {
+ if (state.tidy != null)
+ candidates.add(state.tidy);
+ }
removeExpected(candidates);
this.candidates.retainAll(candidates);
if (!this.candidates.isEmpty())
@@ -706,6 +728,11 @@ public final class Ref<T> implements RefCounted<T>
}
}
+ public static void setOnLeak(OnLeak onLeak)
+ {
+ ON_LEAK = onLeak;
+ }
+
@VisibleForTesting
public static void shutdownReferenceReaper(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java b/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java
index c3f03a5..01c52c5 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java
@@ -19,15 +19,10 @@
package org.apache.cassandra.utils.concurrent;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import net.openhft.chronicle.core.util.ThrowingConsumer;
import org.apache.cassandra.utils.Intercept;
import org.apache.cassandra.utils.Shared;
-import static java.lang.System.nanoTime;
-import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
@Shared(scope = SIMULATION)
@@ -88,7 +83,7 @@ public interface Semaphore
@Intercept
public static Semaphore newSemaphore(int permits)
{
- return new UnfairAsync(permits);
+ return new Standard(permits, false);
}
/**
@@ -99,160 +94,19 @@ public interface Semaphore
@Intercept
public static Semaphore newFairSemaphore(int permits)
{
- return new FairJDK(permits);
+ return new Standard(permits, true);
}
- /**
- * An unfair semaphore, making no guarantees about thread starvation.
- *
- * TODO this Semaphore is potentially inefficient if used with release quantities other than 1
- * (this is unimportant at time of authoring)
- */
- public static class UnfairAsync implements Semaphore
+ public static class Standard extends java.util.concurrent.Semaphore implements Semaphore
{
- private static final AtomicReferenceFieldUpdater<UnfairAsync, WaitQueue> waitingUpdater = AtomicReferenceFieldUpdater.newUpdater(UnfairAsync.class, WaitQueue.class, "waiting");
- private static final AtomicIntegerFieldUpdater<UnfairAsync> permitsUpdater = AtomicIntegerFieldUpdater.newUpdater(UnfairAsync.class, "permits");
- private volatile WaitQueue waiting;
- private volatile int permits;
-
- // WARNING: if extending this class, consider simulator interactions
- public UnfairAsync(int permits)
- {
- this.permits = permits;
- }
-
- /**
- * {@link Semaphore#drain()}
- */
- public int drain()
- {
- return permitsUpdater.getAndSet(this, 0);
- }
-
- /**
- * {@link Semaphore#permits()}
- */
- public int permits()
- {
- return permits;
- }
-
- /**
- * {@link Semaphore#release(int)}
- */
- public void release(int permits)
- {
- if (permits < 0) throw new IllegalArgumentException();
- if (permits > 0 && permitsUpdater.getAndAdd(this, permits) == 0)
- {
- if (waiting != null)
- {
- if (permits > 1) waiting.signalAll();
- else waiting.signal();
- }
- }
- }
-
- /**
- * {@link Semaphore#tryAcquire(int)}
- */
- public boolean tryAcquire(int acquire)
- {
- if (acquire < 0)
- throw new IllegalArgumentException();
- while (true)
- {
- int cur = permits;
- if (cur < acquire)
- return false;
- if (permitsUpdater.compareAndSet(this, cur, cur - acquire))
- return true;
- }
- }
-
- /**
- * {@link Semaphore#tryAcquire(int, long, TimeUnit)}
- */
- public boolean tryAcquire(int acquire, long time, TimeUnit unit) throws InterruptedException
- {
- return tryAcquireUntil(acquire, nanoTime() + unit.toNanos(time));
- }
-
- /**
- * {@link Semaphore#tryAcquireUntil(int, long)}
- */
- public boolean tryAcquireUntil(int acquire, long nanoTimeDeadline) throws InterruptedException
- {
- boolean wait = true;
- while (true)
- {
- int cur = permits;
- if (cur < acquire)
- {
- if (!wait) return false;
-
- WaitQueue.Signal signal = register();
- if (permits < acquire) wait = signal.awaitUntil(nanoTimeDeadline);
- else signal.cancel();
- }
- else if (permitsUpdater.compareAndSet(this, cur, cur - acquire))
- return true;
- }
- }
-
- /**
- * {@link Semaphore#acquire(int)}
- */
- public void acquire(int acquire) throws InterruptedException
+ public Standard(int permits)
{
- acquire(acquire, WaitQueue.Signal::await);
+ this(permits, false);
}
- /**
- * {@link Semaphore#acquireThrowUncheckedOnInterrupt(int)}
- */
- public void acquireThrowUncheckedOnInterrupt(int acquire)
+ public Standard(int permits, boolean fair)
{
- acquire(acquire, WaitQueue.Signal::awaitThrowUncheckedOnInterrupt);
- }
-
- private <T extends Throwable> void acquire(int acquire, ThrowingConsumer<WaitQueue.Signal, T> wait) throws T
- {
- while (true)
- {
- int cur = permits;
- if (cur < acquire)
- {
- WaitQueue.Signal signal = register();
- if (permits < acquire) wait.accept(signal);
- else signal.cancel();
- }
- else if (permitsUpdater.compareAndSet(this, cur, cur - acquire))
- return;
- }
- }
-
- private WaitQueue.Signal register()
- {
- if (waiting == null)
- waitingUpdater.compareAndSet(this, null, newWaitQueue());
- return waiting.register();
- }
- }
-
- /**
- * A fair semaphore, guaranteeing threads are signalled in the order they request permits.
- *
- * Unlike {@link UnfairAsync} this class is efficient for arbitrarily-sized increments and decrements,
- * however it has the normal throughput bottleneck of fairness.
- */
- public static class FairJDK implements Semaphore
- {
- final java.util.concurrent.Semaphore wrapped;
-
- public FairJDK(int permits)
- {
- wrapped = new java.util.concurrent.Semaphore(permits, true); // checkstyle: permit this instantiation
+ super(permits, fair);
}
/**
@@ -260,7 +114,7 @@ public interface Semaphore
*/
public int drain()
{
- return wrapped.drainPermits();
+ return drainPermits();
}
/**
@@ -268,7 +122,7 @@ public interface Semaphore
*/
public int permits()
{
- return wrapped.availablePermits();
+ return availablePermits();
}
/**
@@ -276,31 +130,7 @@ public interface Semaphore
*/
public int waiting()
{
- return wrapped.getQueueLength();
- }
-
- /**
- * {@link Semaphore#release(int)}
- */
- public void release(int permits)
- {
- wrapped.release(permits);
- }
-
- /**
- * {@link Semaphore#tryAcquire(int)}
- */
- public boolean tryAcquire(int permits)
- {
- return wrapped.tryAcquire(permits);
- }
-
- /**
- * {@link Semaphore#tryAcquire(int, long, TimeUnit)}
- */
- public boolean tryAcquire(int acquire, long time, TimeUnit unit) throws InterruptedException
- {
- return wrapped.tryAcquire(acquire, time, unit);
+ return getQueueLength();
}
/**
@@ -309,15 +139,7 @@ public interface Semaphore
public boolean tryAcquireUntil(int acquire, long nanoTimeDeadline) throws InterruptedException
{
long wait = nanoTimeDeadline - System.nanoTime();
- return wrapped.tryAcquire(acquire, Math.max(0, wait), TimeUnit.NANOSECONDS);
- }
-
- /**
- * {@link Semaphore#acquire(int)}
- */
- public void acquire(int acquire) throws InterruptedException
- {
- wrapped.acquire(acquire);
+ return tryAcquire(acquire, Math.max(0, wait), TimeUnit.NANOSECONDS);
}
@Override
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
index 43648c0..a7b3473 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/SyncFuture.java
@@ -90,7 +90,7 @@ public class SyncFuture<V> extends AbstractFuture<V>
}
/**
- * Support {@link com.google.common.util.concurrent.Futures#transform(ListenableFuture, com.google.common.base.Function, Executor)} natively
+ * Support {@link com.google.common.util.concurrent.Futures#transform} natively
*
* See {@link #addListener(GenericFutureListener)} for ordering semantics.
*/
@@ -165,7 +165,7 @@ public class SyncFuture<V> extends AbstractFuture<V>
private void notifyListeners()
{
- ListenerList.notify(listeners, this);
+ ListenerList.notifyExclusive(listeners, this);
listenersUpdater.lazySet(this, null);
}
}
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index d656616..f302f4f 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.BufferPoolMetrics;
import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.Shared;
import org.apache.cassandra.utils.concurrent.Ref;
import static com.google.common.collect.ImmutableList.of;
@@ -55,6 +56,7 @@ import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFac
import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.UNSAFE;
import static org.apache.cassandra.utils.ExecutorUtils.*;
import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
import static org.apache.cassandra.utils.memory.MemoryUtil.isExactlyDirect;
/**
@@ -119,6 +121,7 @@ public class BufferPool
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
private volatile Debug debug = Debug.NO_OP;
+ private volatile DebugLeaks debugLeaks = DebugLeaks.NO_OP;
protected final String name;
protected final BufferPoolMetrics metrics;
@@ -305,10 +308,19 @@ public class BufferPool
void recyclePartial(Chunk chunk);
}
- public void debug(Debug setDebug)
+ @Shared(scope = SIMULATION)
+ public interface DebugLeaks
{
- assert setDebug != null;
- this.debug = setDebug;
+ public static DebugLeaks NO_OP = () -> {};
+ void leak();
+ }
+
+ public void debug(Debug newDebug, DebugLeaks newDebugLeaks)
+ {
+ if (newDebug != null)
+ this.debug = newDebug;
+ if (newDebugLeaks != null)
+ this.debugLeaks = newDebugLeaks;
}
interface Recycler
@@ -1025,6 +1037,7 @@ public class BufferPool
Object obj = localPoolRefQueue.remove(100);
if (obj instanceof LocalPoolRef)
{
+ debugLeaks.leak();
((LocalPoolRef) obj).release();
localPoolReferences.remove(obj);
}
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
index 0596aeb..ebe92ac 100644
--- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
@@ -21,8 +21,11 @@ package org.apache.cassandra.utils.memory;
import java.nio.ByteBuffer;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.utils.Shared;
import org.apache.cassandra.utils.concurrent.OpOrder;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
public class HeapPool extends MemtablePool
{
private static final EnsureOnHeap ENSURE_NOOP = new EnsureOnHeap.NoOp();
@@ -58,6 +61,7 @@ public class HeapPool extends MemtablePool
public static class Logged extends MemtablePool
{
+ @Shared(scope = SIMULATION)
public interface Listener
{
public void accept(long size, String table);
diff --git a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
index d9e8372..9bb217c 100644
--- a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
+++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
@@ -265,7 +265,7 @@ public class LongBufferPoolTest
logger.info("{} - testing {} threads for {}m", DATE_FORMAT.format(new Date()), threadCount, TimeUnit.NANOSECONDS.toMinutes(duration));
logger.info("Testing BufferPool with memoryUsageThreshold={} and enabling BufferPool.DEBUG", bufferPool.memoryUsageThreshold());
Debug debug = new Debug();
- bufferPool.debug(debug);
+ bufferPool.debug(debug, null);
TestEnvironment testEnv = new TestEnvironment(threadCount, duration, bufferPool.memoryUsageThreshold());
@@ -305,7 +305,7 @@ public class LongBufferPoolTest
assertEquals(0, testEnv.executorService.shutdownNow().size());
logger.info("Reverting BufferPool DEBUG config");
- bufferPool.debug(BufferPool.Debug.NO_OP);
+ bufferPool.debug(BufferPool.Debug.NO_OP, null);
testEnv.assertCheckedThreadsSucceeded();
diff --git a/test/unit/org/apache/cassandra/concurrent/AbstractExecutorPlusTest.java b/test/unit/org/apache/cassandra/concurrent/AbstractExecutorPlusTest.java
index eadacd1..52650ad 100644
--- a/test/unit/org/apache/cassandra/concurrent/AbstractExecutorPlusTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/AbstractExecutorPlusTest.java
@@ -32,6 +32,8 @@ import org.apache.cassandra.utils.WithResources;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.Semaphore;
+import static org.apache.cassandra.utils.concurrent.Semaphore.newSemaphore;
+
@Ignore
public abstract class AbstractExecutorPlusTest
{
@@ -100,9 +102,9 @@ public abstract class AbstractExecutorPlusTest
SequentialExecutorPlus exec = builder.build();
- Semaphore enter = new Semaphore.UnfairAsync(0);
- Semaphore exit = new Semaphore.UnfairAsync(0);
- Semaphore runAfter = new Semaphore.UnfairAsync(0);
+ Semaphore enter = newSemaphore(0);
+ Semaphore exit = newSemaphore(0);
+ Semaphore runAfter = newSemaphore(0);
SequentialExecutorPlus.AtLeastOnceTrigger trigger;
trigger = exec.atLeastOnceTrigger(() -> { enter.release(1); exit.acquireThrowUncheckedOnInterrupt(1); });
diff --git a/test/unit/org/apache/cassandra/utils/concurrent/SemaphoreTest.java b/test/unit/org/apache/cassandra/utils/concurrent/SemaphoreTest.java
index 77ed5ab..c4cb86d 100644
--- a/test/unit/org/apache/cassandra/utils/concurrent/SemaphoreTest.java
+++ b/test/unit/org/apache/cassandra/utils/concurrent/SemaphoreTest.java
@@ -36,7 +36,7 @@ public class SemaphoreTest
@Test
public void testUnfair() throws InterruptedException
{
- Semaphore s = new Semaphore.UnfairAsync(2);
+ Semaphore s = Semaphore.newSemaphore(2);
List<Future<Boolean>> fs = start(s);
s.release(1);
while (s.permits() == 1) Thread.yield();
@@ -54,7 +54,7 @@ public class SemaphoreTest
@Test
public void testFair() throws InterruptedException, ExecutionException, TimeoutException
{
- Semaphore s = new Semaphore.FairJDK(2);
+ Semaphore s = Semaphore.newFairSemaphore(2);
List<Future<Boolean>> fs = start(s);
s.release(1);
fs.get(0).get(1L, TimeUnit.MINUTES);
@@ -83,9 +83,9 @@ public class SemaphoreTest
try { s.tryAcquireUntil(1, System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(1L)); Assert.fail(); } catch (InterruptedException ignore) { }
List<Future<Boolean>> fs = new ArrayList<>();
fs.add(exec.submit(() -> s.tryAcquire(1, 1L, TimeUnit.MINUTES)));
- while (s instanceof Semaphore.FairJDK && ((Semaphore.FairJDK) s).waiting() == 0) Thread.yield();
+ while (s instanceof Semaphore.Standard && ((Semaphore.Standard) s).waiting() == 0) Thread.yield();
fs.add(exec.submit(() -> s.tryAcquireUntil(1, System.nanoTime() + TimeUnit.MINUTES.toNanos(1L))));
- while (s instanceof Semaphore.FairJDK && ((Semaphore.FairJDK) s).waiting() == 1) Thread.yield();
+ while (s instanceof Semaphore.Standard && ((Semaphore.Standard) s).waiting() == 1) Thread.yield();
fs.add(exec.submit(() -> { s.acquire(1); return true; } ));
return fs;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org