You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2019/01/23 19:28:38 UTC

[ambari] branch trunk updated: AMBARI-25111. Intermittent ConcurrentModificationException exception during STOMP message emitting. (#2776)

This is an automated email from the ASF dual-hosted git repository.

mpapirkovskyy pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 06c4f06  AMBARI-25111. Intermittent ConcurrentModificationException exception during STOMP message emitting. (#2776)
06c4f06 is described below

commit 06c4f06cb4226053ea4c712a1e3f5548488043cd
Author: Myroslav Papirkovskyi <mp...@apache.org>
AuthorDate: Wed Jan 23 21:28:32 2019 +0200

    AMBARI-25111. Intermittent ConcurrentModificationException exception during STOMP message emitting. (#2776)
    
    * AMBARI-25111. Intermittent ConcurrentModificationException exception during STOMP message emitting. (mpapirkovskyy)
    
    * AMBARI-25111. Intermittent ConcurrentModificationException exception during STOMP message emitting. (mpapirkovskyy)
    
    * AMBARI-25111. Intermittent ConcurrentModificationException exception during STOMP message emitting. (mpapirkovskyy)
---
 .../agent/stomp/AmbariSubscriptionRegistry.java    | 235 ++++++++++++---------
 1 file changed, 131 insertions(+), 104 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
index 68330c6..7c820a0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AmbariSubscriptionRegistry.java
@@ -29,7 +29,6 @@ import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.expression.AccessException;
 import org.springframework.expression.EvaluationContext;
 import org.springframework.expression.Expression;
 import org.springframework.expression.ExpressionParser;
@@ -37,7 +36,8 @@ import org.springframework.expression.PropertyAccessor;
 import org.springframework.expression.TypedValue;
 import org.springframework.expression.spel.SpelEvaluationException;
 import org.springframework.expression.spel.standard.SpelExpressionParser;
-import org.springframework.expression.spel.support.StandardEvaluationContext;
+import org.springframework.expression.spel.support.SimpleEvaluationContext;
+import org.springframework.lang.Nullable;
 import org.springframework.messaging.Message;
 import org.springframework.messaging.MessageHeaders;
 import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
@@ -60,6 +60,10 @@ import com.google.common.cache.CacheBuilder;
 public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
   private static final Logger LOG = LoggerFactory.getLogger(AmbariSubscriptionRegistry.class);
 
+  /** Static evaluation context to reuse. */
+  private static final EvaluationContext messageEvalContext =
+      SimpleEvaluationContext.forPropertyAccessors(new SimpMessageHeaderPropertyAccessor()).build();
+
   private PathMatcher pathMatcher = new AntPathMatcher();
 
   private volatile int cacheLimit;
@@ -138,25 +142,32 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
   protected void addSubscriptionInternal(
       String sessionId, String subsId, String destination, Message<?> message) {
 
+    Expression expression = getSelectorExpression(message.getHeaders());
+    this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
+    this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
+  }
+
+  @Nullable
+  private Expression getSelectorExpression(MessageHeaders headers) {
     Expression expression = null;
-    MessageHeaders headers = message.getHeaders();
-    String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
-    if (selector != null) {
-      try {
-        expression = this.expressionParser.parseExpression(selector);
-        this.selectorHeaderInUse = true;
-        if (logger.isTraceEnabled()) {
-          logger.trace("Subscription selector: [" + selector + "]");
+    if (getSelectorHeaderName() != null) {
+      String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
+      if (selector != null) {
+        try {
+          expression = this.expressionParser.parseExpression(selector);
+          this.selectorHeaderInUse = true;
+          if (logger.isTraceEnabled()) {
+            logger.trace("Subscription selector: [" + selector + "]");
+          }
         }
-      }
-      catch (Throwable ex) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Failed to parse selector: " + selector, ex);
+        catch (Throwable ex) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Failed to parse selector: " + selector, ex);
+          }
         }
       }
     }
-    this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
-    this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
+    return expression;
   }
 
   @Override
@@ -190,29 +201,24 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
     if (!this.selectorHeaderInUse) {
       return allMatches;
     }
-    EvaluationContext context = null;
-    MultiValueMap<String, String> result = new LinkedMultiValueMap<String, String>(allMatches.size());
-    for (String sessionId : allMatches.keySet()) {
-      for (String subId : allMatches.get(sessionId)) {
+    MultiValueMap<String, String> result = new LinkedMultiValueMap<>(allMatches.size());
+    allMatches.forEach((sessionId, subIds) -> {
+      subIds.forEach(subId -> {
         SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId);
         if (info == null) {
-          continue;
+          return;
         }
         Subscription sub = info.getSubscription(subId);
         if (sub == null) {
-          continue;
+          return;
         }
         Expression expression = sub.getSelectorExpression();
         if (expression == null) {
           result.add(sessionId, subId);
-          continue;
-        }
-        if (context == null) {
-          context = new StandardEvaluationContext(message);
-          context.getPropertyAccessors().add(new SimpMessageHeaderPropertyAccessor());
+          return;
         }
         try {
-          if (expression.getValue(context, boolean.class)) {
+          if (Boolean.TRUE.equals(expression.getValue(messageEvalContext, message, Boolean.class))) {
             result.add(sessionId, subId);
           }
         }
@@ -224,8 +230,8 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
         catch (Throwable ex) {
           logger.debug("Failed to evaluate selector", ex);
         }
-      }
-    }
+      });
+    });
     return result;
   }
 
@@ -250,35 +256,42 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
         CacheBuilder.newBuilder().maximumSize(cacheLimit).build();
 
     public LinkedMultiValueMap<String, String> getSubscriptions(String destination, Message<?> message) {
+      LinkedMultiValueMap<String, String> copiedSubscriptions = new LinkedMultiValueMap<>();
       if (notSubscriptionCache.asMap().keySet().contains(destination)) {
-        return new LinkedMultiValueMap<>();
+        return copiedSubscriptions;
       }
-      LinkedMultiValueMap<String, String> subscriptions = this.accessCache.computeIfAbsent(destination, (key) -> {
-        LinkedMultiValueMap<String, String> result = new LinkedMultiValueMap<>();
-        for (SessionSubscriptionInfo info : subscriptionRegistry.getAllSubscriptions()) {
-          for (String destinationPattern : info.getDestinations()) {
-            //TODO temporary changed to more fast-acting check without regex, need move investigation
-            if (destinationPattern.equals(destination)) {
-              for (Subscription subscription : info.getSubscriptions(destinationPattern)) {
-                result.add(info.sessionId, subscription.getId());
+      this.accessCache.compute(destination, (key, value) -> {
+        if (value == null) {
+          LinkedMultiValueMap<String, String> result = new LinkedMultiValueMap<>();
+          subscriptionRegistry.getAllSubscriptions().forEach((info) -> {
+            info.getDestinations().forEach((destinationPattern) -> {
+              //TODO temporary changed to more fast-acting check without regex, need move investigation
+              if (destinationPattern.equals(destination)) {
+                info.getSubscriptions(destinationPattern).forEach((subscription) -> {
+                  result.add(info.sessionId, subscription.getId());
+                });
               }
-            }
+            });
+          });
+          if (!result.isEmpty()) {
+            copiedSubscriptions.addAll(result.deepCopy());
+            return result;
+          } else {
+            notSubscriptionCache.put(destination, "");
+            return null;
           }
-        }
-        if (!result.isEmpty()) {
-          return result;
         } else {
-          notSubscriptionCache.put(destination, "");
-          return null;
+          copiedSubscriptions.addAll(value.deepCopy());
+          return value;
         }
       });
-      return subscriptions == null ? new LinkedMultiValueMap<>() : subscriptions;
+      return copiedSubscriptions;
     }
 
     public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {
       LinkedMultiValueMap<String, String> updatedMap = this.accessCache.computeIfPresent(destination, (key, value) -> {
         if (getPathMatcher().match(destination, key)) {
-          LinkedMultiValueMap<String, String> subs = value.deepCopy();
+          LinkedMultiValueMap<String, String> subs = value;
           subs.add(sessionId, subsId);
           return subs;
         }
@@ -294,21 +307,23 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
            this.accessCache.entrySet().iterator(); iterator.hasNext(); ) {
         Map.Entry<String, LinkedMultiValueMap<String, String>> entry = iterator.next();
         String destination = entry.getKey();
-        LinkedMultiValueMap<String, String> sessionMap = entry.getValue();
-        List<String> subscriptions = sessionMap.get(sessionId);
-        if (subscriptions != null) {
-          subscriptions.remove(subsId);
-          if (subscriptions.isEmpty()) {
-            sessionMap.remove(sessionId);
-          }
-          if (sessionMap.isEmpty()) {
-            iterator.remove();
-          }
-          else {
-            this.notSubscriptionCache.invalidate(destination);
-            this.accessCache.put(destination, sessionMap.deepCopy());
+        this.accessCache.compute(destination, (key, value) -> {
+          if (value != null) {
+            List<String> subscriptions = value.get(sessionId);
+            if (subscriptions != null) {
+              subscriptions.remove(subsId);
+              if (subscriptions.isEmpty()) {
+                value.remove(sessionId);
+              }
+              if (value.isEmpty()) {
+                return null;
+              } else {
+                this.notSubscriptionCache.invalidate(destination);
+              }
+            }
           }
-        }
+          return value;
+        });
       }
     }
 
@@ -317,16 +332,18 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
            this.accessCache.entrySet().iterator(); iterator.hasNext(); ) {
         Map.Entry<String, LinkedMultiValueMap<String, String>> entry = iterator.next();
         String destination = entry.getKey();
-        LinkedMultiValueMap<String, String> sessionMap = entry.getValue();
-        if (sessionMap.remove(info.getSessionId()) != null) {
-          if (sessionMap.isEmpty()) {
-            iterator.remove();
-          }
-          else {
-            this.notSubscriptionCache.invalidate(destination);
-            this.accessCache.put(destination, sessionMap.deepCopy());
+        this.accessCache.compute(destination, (key, value) -> {
+          if (value != null) {
+            if (value.remove(info.getSessionId()) != null) {
+              if (value.isEmpty()) {
+                return null;
+              } else {
+                this.notSubscriptionCache.invalidate(destination);
+              }
+            }
           }
-        }
+          return value;
+        });
       }
     }
 
@@ -343,9 +360,9 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
   private static class SessionSubscriptionRegistry {
 
     // sessionId -> SessionSubscriptionInfo
-    private final ConcurrentMap<String, SessionSubscriptionInfo> sessions =
-        new ConcurrentHashMap<String, SessionSubscriptionInfo>();
+    private final ConcurrentMap<String, SessionSubscriptionInfo> sessions = new ConcurrentHashMap<>();
 
+    @Nullable
     public SessionSubscriptionInfo getSubscriptions(String sessionId) {
       return this.sessions.get(sessionId);
     }
@@ -355,7 +372,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
     }
 
     public SessionSubscriptionInfo addSubscription(String sessionId, String subscriptionId,
-                                                                                                                         String destination, Expression selectorExpression) {
+                                                   String destination, @Nullable Expression selectorExpression) {
 
       SessionSubscriptionInfo info = this.sessions.get(sessionId);
       if (info == null) {
@@ -369,6 +386,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
       return info;
     }
 
+    @Nullable
     public SessionSubscriptionInfo removeSubscriptions(String sessionId) {
       return this.sessions.remove(sessionId);
     }
@@ -388,8 +406,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
     private final String sessionId;
 
     // destination -> subscriptions
-    private final Map<String, Set<Subscription>> destinationLookup =
-        new ConcurrentHashMap<String, Set<Subscription>>(4);
+    private final Map<String, Set<Subscription>> destinationLookup = new ConcurrentHashMap<>(4);
 
     public SessionSubscriptionInfo(String sessionId) {
       Assert.notNull(sessionId, "'sessionId' must not be null");
@@ -408,27 +425,26 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
       return this.destinationLookup.get(destination);
     }
 
+    @Nullable
     public Subscription getSubscription(String subscriptionId) {
-      for (String destination : this.destinationLookup.keySet()) {
-        Set<Subscription> subs = this.destinationLookup.get(destination);
-        if (subs != null) {
-          for (Subscription sub : subs) {
-            if (sub.getId().equalsIgnoreCase(subscriptionId)) {
-              return sub;
-            }
+      for (Map.Entry<String, Set<Subscription>> destinationEntry :
+          this.destinationLookup.entrySet()) {
+        for (Subscription sub : destinationEntry.getValue()) {
+          if (sub.getId().equalsIgnoreCase(subscriptionId)) {
+            return sub;
           }
         }
       }
       return null;
     }
 
-    public void addSubscription(String destination, String subscriptionId, Expression selectorExpression) {
+    public void addSubscription(String destination, String subscriptionId, @Nullable Expression selectorExpression) {
       Set<Subscription> subs = this.destinationLookup.get(destination);
       if (subs == null) {
         synchronized (this.destinationLookup) {
           subs = this.destinationLookup.get(destination);
           if (subs == null) {
-            subs = new CopyOnWriteArraySet<Subscription>();
+            subs = new CopyOnWriteArraySet<>();
             this.destinationLookup.put(destination, subs);
           }
         }
@@ -436,18 +452,20 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
       subs.add(new Subscription(subscriptionId, selectorExpression));
     }
 
+    @Nullable
     public String removeSubscription(String subscriptionId) {
-      for (String destination : this.destinationLookup.keySet()) {
-        Set<Subscription> subs = this.destinationLookup.get(destination);
+      for (Map.Entry<String, Set<Subscription>> destinationEntry :
+          this.destinationLookup.entrySet()) {
+        Set<Subscription> subs = destinationEntry.getValue();
         if (subs != null) {
           for (Subscription sub : subs) {
             if (sub.getId().equals(subscriptionId) && subs.remove(sub)) {
               synchronized (this.destinationLookup) {
                 if (subs.isEmpty()) {
-                  this.destinationLookup.remove(destination);
+                  this.destinationLookup.remove(destinationEntry.getKey());
                 }
               }
-              return destination;
+              return destinationEntry.getKey();
             }
           }
         }
@@ -466,9 +484,10 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
 
     private final String id;
 
+    @Nullable
     private final Expression selectorExpression;
 
-    public Subscription(String id, Expression selector) {
+    public Subscription(String id, @Nullable Expression selector) {
       Assert.notNull(id, "Subscription id must not be null");
       this.id = id;
       this.selectorExpression = selector;
@@ -478,6 +497,7 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
       return this.id;
     }
 
+    @Nullable
     public Expression getSelectorExpression() {
       return this.selectorExpression;
     }
@@ -503,39 +523,46 @@ public class AmbariSubscriptionRegistry extends AbstractSubscriptionRegistry {
 
     @Override
     public Class<?>[] getSpecificTargetClasses() {
-      return new Class<?>[] {MessageHeaders.class};
+      return new Class<?>[]{Message.class, MessageHeaders.class};
     }
 
     @Override
-    public boolean canRead(EvaluationContext context, Object target, String name) {
+    public boolean canRead(EvaluationContext context, @Nullable Object target, String name) {
       return true;
     }
 
     @Override
-    public TypedValue read(EvaluationContext context, Object target, String name) throws AccessException {
-      MessageHeaders headers = (MessageHeaders) target;
-      SimpMessageHeaderAccessor accessor =
-          MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class);
+    public TypedValue read(EvaluationContext context, @Nullable Object target, String name) {
       Object value;
-      if ("destination".equalsIgnoreCase(name)) {
-        value = accessor.getDestination();
-      }
-      else {
-        value = accessor.getFirstNativeHeader(name);
-        if (value == null) {
-          value = headers.get(name);
+      if (target instanceof Message) {
+        value = name.equals("headers") ? ((Message) target).getHeaders() : null;
+      } else if (target instanceof MessageHeaders) {
+        MessageHeaders headers = (MessageHeaders) target;
+        SimpMessageHeaderAccessor accessor =
+            MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class);
+        Assert.state(accessor != null, "No SimpMessageHeaderAccessor");
+        if ("destination".equalsIgnoreCase(name)) {
+          value = accessor.getDestination();
+        } else {
+          value = accessor.getFirstNativeHeader(name);
+          if (value == null) {
+            value = headers.get(name);
+          }
         }
+      } else {
+        // Should never happen...
+        throw new IllegalStateException("Expected Message or MessageHeaders.");
       }
       return new TypedValue(value);
     }
 
     @Override
-    public boolean canWrite(EvaluationContext context, Object target, String name) {
+    public boolean canWrite(EvaluationContext context, @Nullable Object target, String name) {
       return false;
     }
 
     @Override
-    public void write(EvaluationContext context, Object target, String name, Object value) {
+    public void write(EvaluationContext context, @Nullable Object target, String name, @Nullable Object value) {
     }
   }