You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2019/01/23 19:28:38 UTC
[ambari] branch trunk updated: AMBARI-25111. Intermittent
ConcurrentModificationException exception during STOMP message emitting.
(#2776)
This is an automated email from the ASF dual-hosted git repository.
mpapirkovskyy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 06c4f06 AMBARI-25111. Intermittent ConcurrentModificationException exception during STOMP message emitting. (#2776)
06c4f06 is described below
commit 06c4f06cb4226053ea4c712a1e3f5548488043cd
Author: Myroslav Papirkovskyi <mp...@apache.org>
AuthorDate: Wed Jan 23 21:28:32 2019 +0200
AMBARI-25111. Intermittent ConcurrentModificationException exception during STOMP message emitting. (#2776)
* AMBARI-25111. Intermittent ConcurrentModificationException exception during STOMP message emitting. (mpapirkovskyy)
* AMBARI-25111. Intermittent ConcurrentModificationException exception during STOMP message emitting. (mpapirkovskyy)
* AMBARI-25111. Intermittent ConcurrentModificationException exception during STOMP message emitting. (mpapirkovskyy)
---
.../agent/stomp/AmbariSubscriptionRegistry.java | 235 ++++++++++++---------
1 file changed, 131 insertions(+), 104 deletions(-)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
index 68330c6..7c820a0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
@@ -29,7 +29,6 @@ import java.util.concurrent.CopyOnWriteArraySet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.expression.AccessException;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
@@ -37,7 +36,8 @@ import org.springframework.expression.PropertyAccessor;
import org.springframework.expression.TypedValue;
import org.springframework.expression.spel.SpelEvaluationException;
import org.springframework.expression.spel.standard.SpelExpressionParser;
-import org.springframework.expression.spel.support.StandardEvaluationContext;
+import org.springframework.expression.spel.support.SimpleEvaluationContext;
+import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
@@ -60,6 +60,10 @@ import com.google.common.cache.CacheBuilder;
public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
private static final Logger LOG = LoggerFactory.getLogger(AmbariSubscriptionRegistry.class);
+ /** Static evaluation context to reuse. */
+ private static final EvaluationContext messageEvalContext =
+ SimpleEvaluationContext.forPropertyAccessors(new SimpMessageHeaderPropertyAccessor()).build();
+
private PathMatcher pathMatcher = new AntPathMatcher();
private volatile int cacheLimit;
@@ -138,25 +142,32 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
protected void addSubscriptionInternal(
String sessionId, String subsId, String destination, Message<?> message) {
+ Expression expression = getSelectorExpression(message.getHeaders());
+ this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
+ this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
+ }
+
+ @Nullable
+ private Expression getSelectorExpression(MessageHeaders headers) {
Expression expression = null;
- MessageHeaders headers = message.getHeaders();
- String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
- if (selector != null) {
- try {
- expression = this.expressionParser.parseExpression(selector);
- this.selectorHeaderInUse = true;
- if (logger.isTraceEnabled()) {
- logger.trace("Subscription selector: [" + selector + "]");
+ if (getSelectorHeaderName() != null) {
+ String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
+ if (selector != null) {
+ try {
+ expression = this.expressionParser.parseExpression(selector);
+ this.selectorHeaderInUse = true;
+ if (logger.isTraceEnabled()) {
+ logger.trace("Subscription selector: [" + selector + "]");
+ }
}
- }
- catch (Throwable ex) {
- if (logger.isDebugEnabled()) {
- logger.debug("Failed to parse selector: " + selector, ex);
+ catch (Throwable ex) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Failed to parse selector: " + selector, ex);
+ }
}
}
}
- this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
- this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
+ return expression;
}
@Override
@@ -190,29 +201,24 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
if (!this.selectorHeaderInUse) {
return allMatches;
}
- EvaluationContext context = null;
- MultiValueMap<String, String> result = new LinkedMultiValueMap<String, String>(allMatches.size());
- for (String sessionId : allMatches.keySet()) {
- for (String subId : allMatches.get(sessionId)) {
+ MultiValueMap<String, String> result = new LinkedMultiValueMap<>(allMatches.size());
+ allMatches.forEach((sessionId, subIds) -> {
+ subIds.forEach(subId -> {
SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId);
if (info == null) {
- continue;
+ return;
}
Subscription sub = info.getSubscription(subId);
if (sub == null) {
- continue;
+ return;
}
Expression expression = sub.getSelectorExpression();
if (expression == null) {
result.add(sessionId, subId);
- continue;
- }
- if (context == null) {
- context = new StandardEvaluationContext(message);
- context.getPropertyAccessors().add(new SimpMessageHeaderPropertyAccessor());
+ return;
}
try {
- if (expression.getValue(context, boolean.class)) {
+ if (Boolean.TRUE.equals(expression.getValue(messageEvalContext, message, Boolean.class))) {
result.add(sessionId, subId);
}
}
@@ -224,8 +230,8 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
catch (Throwable ex) {
logger.debug("Failed to evaluate selector", ex);
}
- }
- }
+ });
+ });
return result;
}
@@ -250,35 +256,42 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
CacheBuilder.newBuilder().maximumSize(cacheLimit).build();
public LinkedMultiValueMap<String, String> getSubscriptions(String destination, Message<?> message) {
+ LinkedMultiValueMap<String, String> copiedSubscriptions = new LinkedMultiValueMap<>();
if (notSubscriptionCache.asMap().keySet().contains(destination)) {
- return new LinkedMultiValueMap<>();
+ return copiedSubscriptions;
}
- LinkedMultiValueMap<String, String> subscriptions = this.accessCache.computeIfAbsent(destination, (key) -> {
- LinkedMultiValueMap<String, String> result = new LinkedMultiValueMap<>();
- for (SessionSubscriptionInfo info : subscriptionRegistry.getAllSubscriptions()) {
- for (String destinationPattern : info.getDestinations()) {
- //TODO temporary changed to more fast-acting check without regex, need move investigation
- if (destinationPattern.equals(destination)) {
- for (Subscription subscription : info.getSubscriptions(destinationPattern)) {
- result.add(info.sessionId, subscription.getId());
+ this.accessCache.compute(destination, (key, value) -> {
+ if (value == null) {
+ LinkedMultiValueMap<String, String> result = new LinkedMultiValueMap<>();
+ subscriptionRegistry.getAllSubscriptions().forEach((info) -> {
+ info.getDestinations().forEach((destinationPattern) -> {
+ //TODO temporary changed to more fast-acting check without regex, need move investigation
+ if (destinationPattern.equals(destination)) {
+ info.getSubscriptions(destinationPattern).forEach((subscription) -> {
+ result.add(info.sessionId, subscription.getId());
+ });
}
- }
+ });
+ });
+ if (!result.isEmpty()) {
+ copiedSubscriptions.addAll(result.deepCopy());
+ return result;
+ } else {
+ notSubscriptionCache.put(destination, "");
+ return null;
}
- }
- if (!result.isEmpty()) {
- return result;
} else {
- notSubscriptionCache.put(destination, "");
- return null;
+ copiedSubscriptions.addAll(value.deepCopy());
+ return value;
}
});
- return subscriptions == null ? new LinkedMultiValueMap<>() : subscriptions;
+ return copiedSubscriptions;
}
public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {
LinkedMultiValueMap<String, String> updatedMap = this.accessCache.computeIfPresent(destination, (key, value) -> {
if (getPathMatcher().match(destination, key)) {
- LinkedMultiValueMap<String, String> subs = value.deepCopy();
+ LinkedMultiValueMap<String, String> subs = value;
subs.add(sessionId, subsId);
return subs;
}
@@ -294,21 +307,23 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
this.accessCache.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, LinkedMultiValueMap<String, String>> entry = iterator.next();
String destination = entry.getKey();
- LinkedMultiValueMap<String, String> sessionMap = entry.getValue();
- List<String> subscriptions = sessionMap.get(sessionId);
- if (subscriptions != null) {
- subscriptions.remove(subsId);
- if (subscriptions.isEmpty()) {
- sessionMap.remove(sessionId);
- }
- if (sessionMap.isEmpty()) {
- iterator.remove();
- }
- else {
- this.notSubscriptionCache.invalidate(destination);
- this.accessCache.put(destination, sessionMap.deepCopy());
+ this.accessCache.compute(destination, (key, value) -> {
+ if (value != null) {
+ List<String> subscriptions = value.get(sessionId);
+ if (subscriptions != null) {
+ subscriptions.remove(subsId);
+ if (subscriptions.isEmpty()) {
+ value.remove(sessionId);
+ }
+ if (value.isEmpty()) {
+ return null;
+ } else {
+ this.notSubscriptionCache.invalidate(destination);
+ }
+ }
}
- }
+ return value;
+ });
}
}
@@ -317,16 +332,18 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
this.accessCache.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, LinkedMultiValueMap<String, String>> entry = iterator.next();
String destination = entry.getKey();
- LinkedMultiValueMap<String, String> sessionMap = entry.getValue();
- if (sessionMap.remove(info.getSessionId()) != null) {
- if (sessionMap.isEmpty()) {
- iterator.remove();
- }
- else {
- this.notSubscriptionCache.invalidate(destination);
- this.accessCache.put(destination, sessionMap.deepCopy());
+ this.accessCache.compute(destination, (key, value) -> {
+ if (value != null) {
+ if (value.remove(info.getSessionId()) != null) {
+ if (value.isEmpty()) {
+ return null;
+ } else {
+ this.notSubscriptionCache.invalidate(destination);
+ }
+ }
}
- }
+ return value;
+ });
}
}
@@ -343,9 +360,9 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
private static class SessionSubscriptionRegistry {
// sessionId -> SessionSubscriptionInfo
- private final ConcurrentMap<String, SessionSubscriptionInfo> sessions =
- new ConcurrentHashMap<String, SessionSubscriptionInfo>();
+ private final ConcurrentMap<String, SessionSubscriptionInfo> sessions = new ConcurrentHashMap<>();
+ @Nullable
public SessionSubscriptionInfo getSubscriptions(String sessionId) {
return this.sessions.get(sessionId);
}
@@ -355,7 +372,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
}
public SessionSubscriptionInfo addSubscription(String sessionId, String subscriptionId,
- String destination, Expression selectorExpression) {
+ String destination, @Nullable Expression selectorExpression) {
SessionSubscriptionInfo info = this.sessions.get(sessionId);
if (info == null) {
@@ -369,6 +386,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
return info;
}
+ @Nullable
public SessionSubscriptionInfo removeSubscriptions(String sessionId) {
return this.sessions.remove(sessionId);
}
@@ -388,8 +406,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
private final String sessionId;
// destination -> subscriptions
- private final Map<String, Set<Subscription>> destinationLookup =
- new ConcurrentHashMap<String, Set<Subscription>>(4);
+ private final Map<String, Set<Subscription>> destinationLookup = new ConcurrentHashMap<>(4);
public SessionSubscriptionInfo(String sessionId) {
Assert.notNull(sessionId, "'sessionId' must not be null");
@@ -408,27 +425,26 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
return this.destinationLookup.get(destination);
}
+ @Nullable
public Subscription getSubscription(String subscriptionId) {
- for (String destination : this.destinationLookup.keySet()) {
- Set<Subscription> subs = this.destinationLookup.get(destination);
- if (subs != null) {
- for (Subscription sub : subs) {
- if (sub.getId().equalsIgnoreCase(subscriptionId)) {
- return sub;
- }
+ for (Map.Entry<String, Set<Subscription>> destinationEntry :
+ this.destinationLookup.entrySet()) {
+ for (Subscription sub : destinationEntry.getValue()) {
+ if (sub.getId().equalsIgnoreCase(subscriptionId)) {
+ return sub;
}
}
}
return null;
}
- public void addSubscription(String destination, String subscriptionId, Expression selectorExpression) {
+ public void addSubscription(String destination, String subscriptionId, @Nullable Expression selectorExpression) {
Set<Subscription> subs = this.destinationLookup.get(destination);
if (subs == null) {
synchronized (this.destinationLookup) {
subs = this.destinationLookup.get(destination);
if (subs == null) {
- subs = new CopyOnWriteArraySet<Subscription>();
+ subs = new CopyOnWriteArraySet<>();
this.destinationLookup.put(destination, subs);
}
}
@@ -436,18 +452,20 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
subs.add(new Subscription(subscriptionId, selectorExpression));
}
+ @Nullable
public String removeSubscription(String subscriptionId) {
- for (String destination : this.destinationLookup.keySet()) {
- Set<Subscription> subs = this.destinationLookup.get(destination);
+ for (Map.Entry<String, Set<Subscription>> destinationEntry :
+ this.destinationLookup.entrySet()) {
+ Set<Subscription> subs = destinationEntry.getValue();
if (subs != null) {
for (Subscription sub : subs) {
if (sub.getId().equals(subscriptionId) && subs.remove(sub)) {
synchronized (this.destinationLookup) {
if (subs.isEmpty()) {
- this.destinationLookup.remove(destination);
+ this.destinationLookup.remove(destinationEntry.getKey());
}
}
- return destination;
+ return destinationEntry.getKey();
}
}
}
@@ -466,9 +484,10 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
private final String id;
+ @Nullable
private final Expression selectorExpression;
- public Subscription(String id, Expression selector) {
+ public Subscription(String id, @Nullable Expression selector) {
Assert.notNull(id, "Subscription id must not be null");
this.id = id;
this.selectorExpression = selector;
@@ -478,6 +497,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
return this.id;
}
+ @Nullable
public Expression getSelectorExpression() {
return this.selectorExpression;
}
@@ -503,39 +523,46 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
@Override
public Class<?>[] getSpecificTargetClasses() {
- return new Class<?>[] {MessageHeaders.class};
+ return new Class<?>[]{Message.class, MessageHeaders.class};
}
@Override
- public boolean canRead(EvaluationContext context, Object target, String name) {
+ public boolean canRead(EvaluationContext context, @Nullable Object target, String name) {
return true;
}
@Override
- public TypedValue read(EvaluationContext context, Object target, String name) throws AccessException {
- MessageHeaders headers = (MessageHeaders) target;
- SimpMessageHeaderAccessor accessor =
- MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class);
+ public TypedValue read(EvaluationContext context, @Nullable Object target, String name) {
Object value;
- if ("destination".equalsIgnoreCase(name)) {
- value = accessor.getDestination();
- }
- else {
- value = accessor.getFirstNativeHeader(name);
- if (value == null) {
- value = headers.get(name);
+ if (target instanceof Message) {
+ value = name.equals("headers") ? ((Message) target).getHeaders() : null;
+ } else if (target instanceof MessageHeaders) {
+ MessageHeaders headers = (MessageHeaders) target;
+ SimpMessageHeaderAccessor accessor =
+ MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class);
+ Assert.state(accessor != null, "No SimpMessageHeaderAccessor");
+ if ("destination".equalsIgnoreCase(name)) {
+ value = accessor.getDestination();
+ } else {
+ value = accessor.getFirstNativeHeader(name);
+ if (value == null) {
+ value = headers.get(name);
+ }
}
+ } else {
+ // Should never happen...
+ throw new IllegalStateException("Expected Message or MessageHeaders.");
}
return new TypedValue(value);
}
@Override
- public boolean canWrite(EvaluationContext context, Object target, String name) {
+ public boolean canWrite(EvaluationContext context, @Nullable Object target, String name) {
return false;
}
@Override
- public void write(EvaluationContext context, Object target, String name, Object value) {
+ public void write(EvaluationContext context, @Nullable Object target, String name, @Nullable Object value) {
}
}