You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/10/21 07:29:52 UTC
incubator-eagle git commit: EAGLE-648: Stream router does not work
properly sometimes
Repository: incubator-eagle
Updated Branches:
refs/heads/master eafff46c3 -> cb6c0cb02
EAGLE-648: Stream router does not work properly sometimes
Auhtor: Li, Garrett
Reviewer: ralphsu
This closes #541
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/cb6c0cb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/cb6c0cb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/cb6c0cb0
Branch: refs/heads/master
Commit: cb6c0cb02410245f8e65846a0fb5a71066e10eec
Parents: eafff46
Author: Xiancheng Li <xi...@ebay.com>
Authored: Thu Oct 20 18:06:38 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Fri Oct 21 14:25:44 2016 +0800
----------------------------------------------------------------------
.../impl/StreamRouterBoltOutputCollector.java | 43 ++++++++++-------
.../alert/engine/runner/StreamRouterBolt.java | 49 +++++++++++++-------
2 files changed, 59 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cb6c0cb0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
index ae678ea..11d4d9e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
@@ -52,7 +52,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
// private final List<String> outputStreamIds;
private final StreamContext streamContext;
private final PartitionedEventSerializer serializer;
- private volatile Map<StreamPartition, StreamRouterSpec> routeSpecMap;
+ private volatile Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap;
private volatile Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap;
private final String sourceId;
@@ -70,8 +70,8 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
try {
this.streamContext.counter().scope("send_count").incr();
StreamPartition partition = event.getPartition();
- StreamRouterSpec routerSpec = routeSpecMap.get(partition);
- if (routerSpec == null) {
+ List<StreamRouterSpec> routerSpecs = routeSpecMap.get(partition);
+ if (routerSpecs == null || routerSpecs.size() <= 0) {
if (LOG.isDebugEnabled()) {
// Don't know how to route stream, if it's correct, it's better to filter useless stream in spout side
LOG.debug("Drop event {} as StreamPartition {} is not pointed to any router metadata {}", event, event.getPartition(), routeSpecMap);
@@ -80,8 +80,8 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
return;
}
- if (routePartitionerMap.get(routerSpec.getPartition()) == null) {
- LOG.error("Partitioner for " + routerSpec + " is null");
+ if (routePartitionerMap.get(partition) == null) {
+ LOG.error("Partitioner for " + routerSpecs.get(0) + " is null");
synchronized (outputLock) {
this.streamContext.counter().scope("fail_count").incr();
this.outputCollector.fail(event.getAnchor());
@@ -101,7 +101,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
for (StreamRoute streamRoute : streamRoutes) {
String targetStreamId = StreamIdConversion.generateStreamIdBetween(sourceId, streamRoute.getTargetComponentId());
try {
- PartitionedEvent emittedEvent = new PartitionedEvent(newEvent, routerSpec.getPartition(), streamRoute.getPartitionKey());
+ PartitionedEvent emittedEvent = new PartitionedEvent(newEvent, partition, streamRoute.getPartitionKey());
// Route Target Stream id instead of component id
if (LOG.isDebugEnabled()) {
LOG.debug("Emitted to stream {} with message {}", targetStreamId, emittedEvent);
@@ -135,12 +135,13 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
Collection<StreamRouterSpec> removed,
Collection<StreamRouterSpec> modified,
Map<String, StreamDefinition> sds) {
- Map<StreamPartition, StreamRouterSpec> copyRouteSpecMap = new HashMap<>(routeSpecMap);
+ Map<StreamPartition, List<StreamRouterSpec>> copyRouteSpecMap = new HashMap<>(routeSpecMap);
Map<StreamPartition, List<StreamRoutePartitioner>> copyRoutePartitionerMap = new HashMap<>(routePartitionerMap);
// added StreamRouterSpec i.e. there is a new StreamPartition
for (StreamRouterSpec spec : added) {
- if (copyRouteSpecMap.containsKey(spec.getPartition())) {
+ if (copyRouteSpecMap.containsKey(spec.getPartition())
+ && copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
LOG.error("Metadata calculation error: add existing StreamRouterSpec " + spec);
} else {
inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds);
@@ -149,7 +150,8 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
// removed StreamRouterSpec i.e. there is a deleted StreamPartition
for (StreamRouterSpec spec : removed) {
- if (!copyRouteSpecMap.containsKey(spec.getPartition())) {
+ if (!copyRouteSpecMap.containsKey(spec.getPartition())
+ || !copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
LOG.error("Metadata calculation error: remove non-existing StreamRouterSpec " + spec);
} else {
inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
@@ -158,7 +160,8 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
// modified StreamRouterSpec, i.e. there is modified StreamPartition, for example WorkSlotQueue assignment is changed
for (StreamRouterSpec spec : modified) {
- if (!copyRouteSpecMap.containsKey(spec.getPartition())) {
+ if (!copyRouteSpecMap.containsKey(spec.getPartition())
+ || !copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
LOG.error("Metadata calculation error: modify nonexisting StreamRouterSpec " + spec);
} else {
inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
@@ -171,19 +174,22 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
routePartitionerMap = copyRoutePartitionerMap;
}
- private void inplaceRemove(Map<StreamPartition, StreamRouterSpec> routeSpecMap,
+ private void inplaceRemove(Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap,
Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap,
StreamRouterSpec toBeRemoved) {
routeSpecMap.remove(toBeRemoved.getPartition());
routePartitionerMap.remove(toBeRemoved.getPartition());
}
- private void inplaceAdd(Map<StreamPartition, StreamRouterSpec> routeSpecMap,
+ private void inplaceAdd(Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap,
Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap,
StreamRouterSpec toBeAdded, Map<String, StreamDefinition> sds) {
- routeSpecMap.put(toBeAdded.getPartition(), toBeAdded);
+ if (!routeSpecMap.containsKey(toBeAdded.getPartition())) {
+ routeSpecMap.put(toBeAdded.getPartition(), new ArrayList<StreamRouterSpec>());
+ }
+ routeSpecMap.get(toBeAdded.getPartition()).add(toBeAdded);
try {
- List<StreamRoutePartitioner> routePartitioners = calculatePartitioner(toBeAdded, sds);
+ List<StreamRoutePartitioner> routePartitioners = calculatePartitioner(toBeAdded, sds, routePartitionerMap);
routePartitionerMap.put(toBeAdded.getPartition(), routePartitioners);
} catch (Exception e) {
LOG.error("ignore this failure StreamRouterSpec " + toBeAdded + ", with error" + e.getMessage(), e);
@@ -192,8 +198,13 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
}
}
- private List<StreamRoutePartitioner> calculatePartitioner(StreamRouterSpec streamRouterSpec, Map<String, StreamDefinition> sds) throws Exception {
- List<StreamRoutePartitioner> routePartitioners = new ArrayList<>();
+ private List<StreamRoutePartitioner> calculatePartitioner(StreamRouterSpec streamRouterSpec,
+ Map<String, StreamDefinition> sds,
+ Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap) throws Exception {
+ List<StreamRoutePartitioner> routePartitioners = routePartitionerMap.get(streamRouterSpec.getPartition());
+ if (routePartitioners == null) {
+ routePartitioners = new ArrayList<>();
+ }
for (PolicyWorkerQueue pwq : streamRouterSpec.getTargetQueue()) {
routePartitioners.add(StreamRoutePartitionFactory.createRoutePartitioner(
Lists.transform(pwq.getWorkers(), WorkSlot::getBoltId),
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cb6c0cb0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
index c85d183..1236b7f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- * <p/>
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,29 +16,39 @@
*/
package org.apache.eagle.alert.engine.runner;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
import org.apache.eagle.alert.coordination.model.RouterSpec;
import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
import org.apache.eagle.alert.engine.StreamContextImpl;
-import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.MetadataType;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
import org.apache.eagle.alert.engine.router.StreamRouter;
import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector;
import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
import org.apache.eagle.alert.utils.AlertConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
import backtype.storm.metric.api.MultiCountMetric;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
-import org.apache.commons.collections.CollectionUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouterBoltSpecListener, SerializationMetadataProvider {
private static final Logger LOG = LoggerFactory.getLogger(StreamRouterBolt.class);
@@ -48,7 +58,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
// mapping from StreamPartition to StreamSortSpec
private volatile Map<StreamPartition, StreamSortSpec> cachedSSS = new HashMap<>();
// mapping from StreamPartition(streamId, groupbyspec) to StreamRouterSpec
- private volatile Map<StreamPartition, StreamRouterSpec> cachedSRS = new HashMap<>();
+ private volatile Map<StreamPartition, List<StreamRouterSpec>> cachedSRS = new HashMap<>();
public StreamRouterBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService) {
super(boltId, changeNotifyService, config);
@@ -127,8 +137,13 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
cachedSSS = newSSS;
// figure out added, removed, modified StreamRouterSpec
- Map<StreamPartition, StreamRouterSpec> newSRS = new HashMap<>();
- spec.getRouterSpecs().forEach(t -> newSRS.put(t.getPartition(), t));
+ Map<StreamPartition, List<StreamRouterSpec>> newSRS = new HashMap<>();
+ spec.getRouterSpecs().forEach(t -> {
+ if (!newSRS.containsKey(t.getPartition())) {
+ newSRS.put(t.getPartition(), new ArrayList<StreamRouterSpec>());
+ }
+ newSRS.get(t.getPartition()).add(t);
+ });
Set<StreamPartition> newStreamPartitions = newSRS.keySet();
Set<StreamPartition> cachedStreamPartitions = cachedSRS.keySet();
@@ -140,11 +155,11 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
Collection<StreamRouterSpec> addedRouterSpecs = new ArrayList<>();
Collection<StreamRouterSpec> removedRouterSpecs = new ArrayList<>();
Collection<StreamRouterSpec> modifiedRouterSpecs = new ArrayList<>();
- addedStreamPartitions.forEach(s -> addedRouterSpecs.add(newSRS.get(s)));
- removedStreamPartitions.forEach(s -> removedRouterSpecs.add(cachedSRS.get(s)));
+ addedStreamPartitions.forEach(s -> addedRouterSpecs.addAll(newSRS.get(s)));
+ removedStreamPartitions.forEach(s -> removedRouterSpecs.addAll(cachedSRS.get(s)));
modifiedStreamPartitions.forEach(s -> {
- if (!newSRS.get(s).equals(cachedSRS.get(s))) { // this means StreamRouterSpec is changed for one specific StreamPartition
- modifiedRouterSpecs.add(newSRS.get(s));
+ if (!CollectionUtils.isEqualCollection(newSRS.get(s), cachedSRS.get(s))) { // this means StreamRouterSpec is changed for one specific StreamPartition
+ modifiedRouterSpecs.addAll(newSRS.get(s));
}
});