You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2017/03/27 17:24:49 UTC
cassandra git commit: Fix NPEs in original CASSANDRA-13324 commit
Repository: cassandra
Updated Branches:
refs/heads/trunk 3dabeeaa2 -> c86de2a98
Fix NPEs in original CASSANDRA-13324 commit
Patch by Ariel Weisberg; Reviewed by Marcus Eriksson
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c86de2a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c86de2a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c86de2a9
Branch: refs/heads/trunk
Commit: c86de2a9817aa45930afe181ae1891d2363393c7
Parents: 3dabeea
Author: Ariel Weisberg <aw...@apple.com>
Authored: Fri Mar 24 17:48:24 2017 -0400
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Mar 27 13:24:36 2017 -0400
----------------------------------------------------------------------
.../apache/cassandra/net/MessagingService.java | 40 +++++++++++++-------
1 file changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c86de2a9/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 55604d0..b7d4329 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -536,8 +536,6 @@ public final class MessagingService implements MessagingServiceMBean
if (cp != null)
cp.incrementTimeout();
- getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
-
if (expiredCallbackInfo.callback.supportsBackPressure())
{
updateBackPressureOnReceive(expiredCallbackInfo.target, expiredCallbackInfo.callback, true);
@@ -607,8 +605,12 @@ public final class MessagingService implements MessagingServiceMBean
{
if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure())
{
- BackPressureState backPressureState = getConnectionPool(host).getBackPressureState();
- backPressureState.onMessageSent(message);
+ OutboundTcpConnectionPool cp = getConnectionPool(host);
+ if (cp != null)
+ {
+ BackPressureState backPressureState = cp.getBackPressureState();
+ backPressureState.onMessageSent(message);
+ }
}
}
@@ -623,11 +625,15 @@ public final class MessagingService implements MessagingServiceMBean
{
if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure())
{
- BackPressureState backPressureState = getConnectionPool(host).getBackPressureState();
- if (!timeout)
- backPressureState.onResponseReceived();
- else
- backPressureState.onResponseTimeout();
+ OutboundTcpConnectionPool cp = getConnectionPool(host);
+ if (cp != null)
+ {
+ BackPressureState backPressureState = cp.getBackPressureState();
+ if (!timeout)
+ backPressureState.onResponseReceived();
+ else
+ backPressureState.onResponseTimeout();
+ }
}
}
@@ -644,10 +650,16 @@ public final class MessagingService implements MessagingServiceMBean
{
if (DatabaseDescriptor.backPressureEnabled())
{
- backPressure.apply(StreamSupport.stream(hosts.spliterator(), false)
- .filter(h -> !h.equals(FBUtilities.getBroadcastAddress()))
- .map(h -> getConnectionPool(h).getBackPressureState())
- .collect(Collectors.toSet()), timeoutInNanos, TimeUnit.NANOSECONDS);
+ Set<BackPressureState> states = new HashSet<BackPressureState>();
+ for (InetAddress host : hosts)
+ {
+ if (host.equals(FBUtilities.getBroadcastAddress()))
+ continue;
+ OutboundTcpConnectionPool cp = getConnectionPool(host);
+ if (cp != null)
+ states.add(cp.getBackPressureState());
+ }
+ backPressure.apply(states, timeoutInNanos, TimeUnit.NANOSECONDS);
}
}
@@ -679,7 +691,7 @@ public final class MessagingService implements MessagingServiceMBean
if (cp != null)
{
logger.trace("Resetting pool for {}", ep);
- getConnectionPool(ep).reset();
+ cp.reset();
}
else
{