You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/10/14 09:31:37 UTC
incubator-eagle git commit: [EAGLE-620]: AlertEngine: SpoutWrapper
are sending duplicated message
Repository: incubator-eagle
Updated Branches:
refs/heads/master c6ac2eb6f -> 2e715e3e9
[EAGLE-620]: AlertEngine: SpoutWrapper are sending duplicated message
Author: ralphsu
This closes #510
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/2e715e3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/2e715e3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/2e715e3e
Branch: refs/heads/master
Commit: 2e715e3e94a6b98e0f3073d5d384878a0a845b72
Parents: c6ac2eb
Author: Ralph, Su <su...@gmail.com>
Authored: Fri Oct 14 17:30:27 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Fri Oct 14 17:32:25 2016 +0800
----------------------------------------------------------------------
.../alert/coordinator/impl/MonitorMetadataGenerator.java | 2 +-
.../alert/engine/spout/SpoutOutputCollectorWrapper.java | 9 ++++++---
2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e715e3e/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
index c5c992b..3f64f86 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
@@ -265,7 +265,7 @@ public class MonitorMetadataGenerator {
}
}
if (targetSm == null) {
- targetSm = new StreamRepartitionMetadata(datasourceName, schema.getStreamId());
+ targetSm = new StreamRepartitionMetadata(topicName, schema.getStreamId());
dsStreamMeta.add(targetSm);
}
if (!targetSm.groupingStrategies.contains(gs)) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e715e3e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
index a8dcc0d..e205da4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
@@ -18,6 +18,9 @@
*/
package org.apache.eagle.alert.engine.spout;
+import backtype.storm.spout.ISpoutOutputCollector;
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.eagle.alert.coordination.model.SpoutSpec;
import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy;
@@ -29,9 +32,6 @@ import org.apache.eagle.alert.engine.model.StreamEvent;
import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
import org.apache.eagle.alert.utils.StreamIdConversion;
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.spout.SpoutOutputCollector;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,6 +121,9 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
phase 2: stream repartition
*/
for (StreamRepartitionMetadata md : streamRepartitionMetadataList) {
+ if (!event.getStreamId().equals(md.getStreamId())) {
+ continue;
+ }
// one stream may have multiple group-by strategies, each strategy is for a specific group-by
for (StreamRepartitionStrategy groupingStrategy : md.groupingStrategies) {
int hash = 0;