You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/10/21 07:29:52 UTC

incubator-eagle git commit: EAGLE-648: Stream router does not work properly sometimes

Repository: incubator-eagle
Updated Branches:
  refs/heads/master eafff46c3 -> cb6c0cb02


EAGLE-648: Stream router does not work properly sometimes

Auhtor: Li, Garrett
Reviewer: ralphsu

This closes #541


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/cb6c0cb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/cb6c0cb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/cb6c0cb0

Branch: refs/heads/master
Commit: cb6c0cb02410245f8e65846a0fb5a71066e10eec
Parents: eafff46
Author: Xiancheng Li <xi...@ebay.com>
Authored: Thu Oct 20 18:06:38 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Fri Oct 21 14:25:44 2016 +0800

----------------------------------------------------------------------
 .../impl/StreamRouterBoltOutputCollector.java   | 43 ++++++++++-------
 .../alert/engine/runner/StreamRouterBolt.java   | 49 +++++++++++++-------
 2 files changed, 59 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cb6c0cb0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
index ae678ea..11d4d9e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
@@ -52,7 +52,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
     //    private final List<String> outputStreamIds;
     private final StreamContext streamContext;
     private final PartitionedEventSerializer serializer;
-    private volatile Map<StreamPartition, StreamRouterSpec> routeSpecMap;
+    private volatile Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap;
     private volatile Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap;
     private final String sourceId;
 
@@ -70,8 +70,8 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
         try {
             this.streamContext.counter().scope("send_count").incr();
             StreamPartition partition = event.getPartition();
-            StreamRouterSpec routerSpec = routeSpecMap.get(partition);
-            if (routerSpec == null) {
+            List<StreamRouterSpec> routerSpecs = routeSpecMap.get(partition);
+            if (routerSpecs == null || routerSpecs.size() <= 0) {
                 if (LOG.isDebugEnabled()) {
                     // Don't know how to route stream, if it's correct, it's better to filter useless stream in spout side
                     LOG.debug("Drop event {} as StreamPartition {} is not pointed to any router metadata {}", event, event.getPartition(), routeSpecMap);
@@ -80,8 +80,8 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
                 return;
             }
 
-            if (routePartitionerMap.get(routerSpec.getPartition()) == null) {
-                LOG.error("Partitioner for " + routerSpec + " is null");
+            if (routePartitionerMap.get(partition) == null) {
+                LOG.error("Partitioner for " + routerSpecs.get(0) + " is null");
                 synchronized (outputLock) {
                     this.streamContext.counter().scope("fail_count").incr();
                     this.outputCollector.fail(event.getAnchor());
@@ -101,7 +101,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
                     for (StreamRoute streamRoute : streamRoutes) {
                         String targetStreamId = StreamIdConversion.generateStreamIdBetween(sourceId, streamRoute.getTargetComponentId());
                         try {
-                            PartitionedEvent emittedEvent = new PartitionedEvent(newEvent, routerSpec.getPartition(), streamRoute.getPartitionKey());
+                            PartitionedEvent emittedEvent = new PartitionedEvent(newEvent, partition, streamRoute.getPartitionKey());
                             // Route Target Stream id instead of component id
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug("Emitted to stream {} with message {}", targetStreamId, emittedEvent);
@@ -135,12 +135,13 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
                                          Collection<StreamRouterSpec> removed,
                                          Collection<StreamRouterSpec> modified,
                                          Map<String, StreamDefinition> sds) {
-        Map<StreamPartition, StreamRouterSpec> copyRouteSpecMap = new HashMap<>(routeSpecMap);
+        Map<StreamPartition, List<StreamRouterSpec>> copyRouteSpecMap = new HashMap<>(routeSpecMap);
         Map<StreamPartition, List<StreamRoutePartitioner>> copyRoutePartitionerMap = new HashMap<>(routePartitionerMap);
 
         // added StreamRouterSpec i.e. there is a new StreamPartition
         for (StreamRouterSpec spec : added) {
-            if (copyRouteSpecMap.containsKey(spec.getPartition())) {
+            if (copyRouteSpecMap.containsKey(spec.getPartition())
+                && copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
                 LOG.error("Metadata calculation error: add existing StreamRouterSpec " + spec);
             } else {
                 inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds);
@@ -149,7 +150,8 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
 
         // removed StreamRouterSpec i.e. there is a deleted StreamPartition
         for (StreamRouterSpec spec : removed) {
-            if (!copyRouteSpecMap.containsKey(spec.getPartition())) {
+            if (!copyRouteSpecMap.containsKey(spec.getPartition())
+                || !copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
                 LOG.error("Metadata calculation error: remove non-existing StreamRouterSpec " + spec);
             } else {
                 inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
@@ -158,7 +160,8 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
 
         // modified StreamRouterSpec, i.e. there is modified StreamPartition, for example WorkSlotQueue assignment is changed
         for (StreamRouterSpec spec : modified) {
-            if (!copyRouteSpecMap.containsKey(spec.getPartition())) {
+            if (!copyRouteSpecMap.containsKey(spec.getPartition())
+                || !copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
                 LOG.error("Metadata calculation error: modify nonexisting StreamRouterSpec " + spec);
             } else {
                 inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
@@ -171,19 +174,22 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
         routePartitionerMap = copyRoutePartitionerMap;
     }
 
-    private void inplaceRemove(Map<StreamPartition, StreamRouterSpec> routeSpecMap,
+    private void inplaceRemove(Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap,
                                Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap,
                                StreamRouterSpec toBeRemoved) {
         routeSpecMap.remove(toBeRemoved.getPartition());
         routePartitionerMap.remove(toBeRemoved.getPartition());
     }
 
-    private void inplaceAdd(Map<StreamPartition, StreamRouterSpec> routeSpecMap,
+    private void inplaceAdd(Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap,
                             Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap,
                             StreamRouterSpec toBeAdded, Map<String, StreamDefinition> sds) {
-        routeSpecMap.put(toBeAdded.getPartition(), toBeAdded);
+        if (!routeSpecMap.containsKey(toBeAdded.getPartition())) {
+            routeSpecMap.put(toBeAdded.getPartition(), new ArrayList<StreamRouterSpec>());
+        }
+        routeSpecMap.get(toBeAdded.getPartition()).add(toBeAdded);
         try {
-            List<StreamRoutePartitioner> routePartitioners = calculatePartitioner(toBeAdded, sds);
+            List<StreamRoutePartitioner> routePartitioners = calculatePartitioner(toBeAdded, sds, routePartitionerMap);
             routePartitionerMap.put(toBeAdded.getPartition(), routePartitioners);
         } catch (Exception e) {
             LOG.error("ignore this failure StreamRouterSpec " + toBeAdded + ", with error" + e.getMessage(), e);
@@ -192,8 +198,13 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
         }
     }
 
-    private List<StreamRoutePartitioner> calculatePartitioner(StreamRouterSpec streamRouterSpec, Map<String, StreamDefinition> sds) throws Exception {
-        List<StreamRoutePartitioner> routePartitioners = new ArrayList<>();
+    private List<StreamRoutePartitioner> calculatePartitioner(StreamRouterSpec streamRouterSpec,
+                                                              Map<String, StreamDefinition> sds,
+                                                              Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap) throws Exception {
+        List<StreamRoutePartitioner> routePartitioners = routePartitionerMap.get(streamRouterSpec.getPartition());
+        if (routePartitioners == null) {
+            routePartitioners = new ArrayList<>();
+        }
         for (PolicyWorkerQueue pwq : streamRouterSpec.getTargetQueue()) {
             routePartitioners.add(StreamRoutePartitionFactory.createRoutePartitioner(
                 Lists.transform(pwq.getWorkers(), WorkSlot::getBoltId),

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cb6c0cb0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
index c85d183..1236b7f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p/>
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,29 +16,39 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
 import org.apache.eagle.alert.coordination.model.RouterSpec;
 import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
 import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.MetadataType;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
 import org.apache.eagle.alert.engine.router.StreamRouter;
 import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
 import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector;
 import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
 import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
 import org.apache.eagle.alert.utils.AlertConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
 
 import backtype.storm.metric.api.MultiCountMetric;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
-import org.apache.commons.collections.CollectionUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
 
 public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouterBoltSpecListener, SerializationMetadataProvider {
     private static final Logger LOG = LoggerFactory.getLogger(StreamRouterBolt.class);
@@ -48,7 +58,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
     // mapping from StreamPartition to StreamSortSpec
     private volatile Map<StreamPartition, StreamSortSpec> cachedSSS = new HashMap<>();
     // mapping from StreamPartition(streamId, groupbyspec) to StreamRouterSpec
-    private volatile Map<StreamPartition, StreamRouterSpec> cachedSRS = new HashMap<>();
+    private volatile Map<StreamPartition, List<StreamRouterSpec>> cachedSRS = new HashMap<>();
 
     public StreamRouterBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService) {
         super(boltId, changeNotifyService, config);
@@ -127,8 +137,13 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
         cachedSSS = newSSS;
 
         // figure out added, removed, modified StreamRouterSpec
-        Map<StreamPartition, StreamRouterSpec> newSRS = new HashMap<>();
-        spec.getRouterSpecs().forEach(t -> newSRS.put(t.getPartition(), t));
+        Map<StreamPartition, List<StreamRouterSpec>> newSRS = new HashMap<>();
+        spec.getRouterSpecs().forEach(t -> {
+            if (!newSRS.containsKey(t.getPartition())) {
+                newSRS.put(t.getPartition(), new ArrayList<StreamRouterSpec>());
+            }
+            newSRS.get(t.getPartition()).add(t);
+        });
 
         Set<StreamPartition> newStreamPartitions = newSRS.keySet();
         Set<StreamPartition> cachedStreamPartitions = cachedSRS.keySet();
@@ -140,11 +155,11 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
         Collection<StreamRouterSpec> addedRouterSpecs = new ArrayList<>();
         Collection<StreamRouterSpec> removedRouterSpecs = new ArrayList<>();
         Collection<StreamRouterSpec> modifiedRouterSpecs = new ArrayList<>();
-        addedStreamPartitions.forEach(s -> addedRouterSpecs.add(newSRS.get(s)));
-        removedStreamPartitions.forEach(s -> removedRouterSpecs.add(cachedSRS.get(s)));
+        addedStreamPartitions.forEach(s -> addedRouterSpecs.addAll(newSRS.get(s)));
+        removedStreamPartitions.forEach(s -> removedRouterSpecs.addAll(cachedSRS.get(s)));
         modifiedStreamPartitions.forEach(s -> {
-            if (!newSRS.get(s).equals(cachedSRS.get(s))) { // this means StreamRouterSpec is changed for one specific StreamPartition
-                modifiedRouterSpecs.add(newSRS.get(s));
+            if (!CollectionUtils.isEqualCollection(newSRS.get(s), cachedSRS.get(s))) { // this means StreamRouterSpec is changed for one specific StreamPartition
+                modifiedRouterSpecs.addAll(newSRS.get(s));
             }
         });