You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/04/02 21:19:51 UTC
[3/6] git commit: Don't shut ExpiringMap down.
Don't shut ExpiringMap down.
Patch by Benedict, reviewed by brandonwilliams for CASSANDRA-6948
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4ec31f2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4ec31f2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4ec31f2
Branch: refs/heads/trunk
Commit: d4ec31f21eb83502bd523c057bce2d3b249b3d80
Parents: 17278b3
Author: Brandon Williams <br...@apache.org>
Authored: Wed Apr 2 14:05:14 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Apr 2 14:14:30 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/net/MessagingService.java | 13 +-----
.../org/apache/cassandra/utils/ExpiringMap.java | 47 +++++---------------
.../apache/cassandra/service/RemoveTest.java | 1 -
4 files changed, 15 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 66196d0..9bbcf07 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.7
+ * Don't shut ExpiringMap down (CASSANDRA-6948)
* Restrict Windows to parallel repairs (CASSANDRA-6907)
* (Hadoop) Allow manually specifying start/end tokens in CFIF (CASSANDRA-6436)
* Fix NPE in MeteredFlusher (CASSANDRA-6820)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index ad86bbd..094e861 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -392,7 +392,6 @@ public final class MessagingService implements MessagingServiceMBean
*/
public void listen(InetAddress localEp) throws ConfigurationException
{
- callbacks.reset(); // hack to allow tests to stop/restart MS
for (ServerSocket ss : getServerSockets(localEp))
{
SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
@@ -536,7 +535,7 @@ public final class MessagingService implements MessagingServiceMBean
{
assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
int messageId = nextId();
- CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
+ ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout);
assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
return messageId;
}
@@ -545,7 +544,7 @@ public final class MessagingService implements MessagingServiceMBean
{
assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
int messageId = nextId();
- CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
+ ExpiringMap.CacheableObject<CallbackInfo> previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout);
assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
return messageId;
}
@@ -654,11 +653,6 @@ public final class MessagingService implements MessagingServiceMBean
subscribers.add(subcriber);
}
- public void clearCallbacksUnsafe()
- {
- callbacks.reset();
- }
-
/**
* Wait for callbacks and don't allow any more to be created (since they could require writing hints)
*/
@@ -668,9 +662,6 @@ public final class MessagingService implements MessagingServiceMBean
// We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
assert !StageManager.getStage(Stage.MUTATION).isShutdown();
- // the important part
- callbacks.shutdownBlocking();
-
// attempt to humor tests that try to stop and restart MS
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index 7eec40e..cbec808 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -21,6 +21,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -35,7 +36,6 @@ import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
public class ExpiringMap<K, V>
{
private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
- private volatile boolean shutdown;
public static class CacheableObject<T>
{
@@ -55,12 +55,17 @@ public class ExpiringMap<K, V>
{
return atNano - createdAt > TimeUnit.MILLISECONDS.toNanos(timeout);
}
+
+ public String toString()
+ {
+ return "CacheableObject(obj=" + value.toString() + ", deltaFromTimeout=" + (System.nanoTime() - (createdAt + TimeUnit.MILLISECONDS.toNanos(timeout))) + "ns)";
+ }
}
// if we use more ExpiringMaps we may want to add multiple threads to this executor
private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER");
- private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<K, CacheableObject<V>>();
+ private final ConcurrentMap<K, CacheableObject<V>> cache = new ConcurrentHashMap<>();
private final long defaultExpiration;
public ExpiringMap(long defaultExpiration)
@@ -81,7 +86,7 @@ public class ExpiringMap<K, V>
throw new IllegalArgumentException("Argument specified must be a positive number");
}
- Runnable runnable = new Runnable()
+ Runnable reaperTask = new Runnable()
{
public void run()
{
@@ -100,45 +105,17 @@ public class ExpiringMap<K, V>
logger.trace("Expired {} entries", n);
}
};
- service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
+ service.scheduleWithFixedDelay(reaperTask, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
}
- public void shutdownBlocking()
- {
- service.shutdown();
- try
- {
- service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- }
-
- public void reset()
- {
- shutdown = false;
- cache.clear();
- }
-
- public V put(K key, V value)
+ public CacheableObject<V> put(K key, V value)
{
return put(key, value, this.defaultExpiration);
}
- public V put(K key, V value, long timeout)
+ public CacheableObject<V> put(K key, V value, long timeout)
{
- if (shutdown)
- {
- // StorageProxy isn't equipped to deal with "I'm nominally alive, but I can't send any messages out."
- // So we'll just sit on this thread until the rest of the server shutdown completes.
- //
- // See comments in CustomTThreadPoolServer.serve, CASSANDRA-3335, and CASSANDRA-3727.
- Uninterruptibles.sleepUninterruptibly(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
- }
- CacheableObject<V> previous = cache.put(key, new CacheableObject<V>(value, timeout));
- return (previous == null) ? null : previous.value;
+ return cache.put(key, new CacheableObject<V>(value, timeout));
}
public V get(K key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4ec31f2/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 62dd636..82e5e9e 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -92,7 +92,6 @@ public class RemoveTest
public void tearDown()
{
SinkManager.clear();
- MessagingService.instance().clearCallbacksUnsafe();
MessagingService.instance().shutdown();
}