You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/12/06 02:13:28 UTC

incubator-eagle git commit: [EAGLE-810] create interface StreamOutputCollector to abstract output…

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 0d1dcc408 -> 976edcd86


[EAGLE-810] create interface StreamOutputCollector to abstract output\u2026

  - create interface StreamOutputCollector to abstract outputcollector

https://issues.apache.org/jira/browse/EAGLE-810

Author: r7raul1984 <ta...@yhd.com>

Closes #696 from r7raul1984/EAGLE-810.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/976edcd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/976edcd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/976edcd8

Branch: refs/heads/master
Commit: 976edcd868b3b90327efe34ee088db579dfc6e89
Parents: 0d1dcc4
Author: r7raul1984 <ta...@yhd.com>
Authored: Tue Dec 6 10:13:22 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Tue Dec 6 10:13:22 2016 +0800

----------------------------------------------------------------------
 ...ertBoltOutputCollectorThreadSafeWrapper.java |  6 +-
 .../impl/AlertBoltOutputCollectorWrapper.java   |  6 +-
 .../engine/router/StreamOutputCollector.java    | 33 ++++++++++
 .../router/impl/StormOutputCollector.java       | 65 ++++++++++++++++++++
 .../impl/StreamRouterBoltOutputCollector.java   | 41 +++++-------
 .../eagle/alert/engine/runner/AlertBolt.java    |  5 +-
 .../alert/engine/runner/StreamRouterBolt.java   |  3 +-
 ...oltOutputCollectorThreadSafeWrapperTest.java |  3 +-
 .../TestStreamRouterBoltOutputCollector.java    | 31 ++++++----
 9 files changed, 144 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
index 33d502e..185853d 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
@@ -18,7 +18,7 @@ package org.apache.eagle.alert.engine.evaluator.impl;
 
 import org.apache.eagle.alert.engine.AlertStreamCollector;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import backtype.storm.task.OutputCollector;
+import org.apache.eagle.alert.engine.router.StreamOutputCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,14 +40,14 @@ import java.util.concurrent.atomic.AtomicLong;
  * </ul>
  */
 public class AlertBoltOutputCollectorThreadSafeWrapper implements AlertStreamCollector {
-    private final OutputCollector delegate;
+    private final StreamOutputCollector delegate;
     private final LinkedBlockingQueue<AlertStreamEvent> queue;
     private static final Logger LOG = LoggerFactory.getLogger(AlertBoltOutputCollectorThreadSafeWrapper.class);
     private final AtomicLong lastFlushTime = new AtomicLong(System.currentTimeMillis());
     private final AutoAlertFlusher flusher;
     private static final int MAX_ALERT_DELAY_SECS = 10;
 
-    public AlertBoltOutputCollectorThreadSafeWrapper(OutputCollector outputCollector) {
+    public AlertBoltOutputCollectorThreadSafeWrapper(StreamOutputCollector outputCollector) {
         this.delegate = outputCollector;
         this.queue = new LinkedBlockingQueue<>();
         this.flusher = new AutoAlertFlusher(this);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
index af2b9f8..3053e6e 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
@@ -25,22 +25,22 @@ import org.apache.eagle.alert.engine.AlertStreamCollector;
 import org.apache.eagle.alert.engine.StreamContext;
 import org.apache.eagle.alert.engine.coordinator.PublishPartition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.router.StreamOutputCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.task.OutputCollector;
 
 public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector {
 
     private static final Logger LOG = LoggerFactory.getLogger(AlertBoltOutputCollectorWrapper.class);
 
-    private final OutputCollector delegate;
+    private final StreamOutputCollector delegate;
     private final Object outputLock;
     private final StreamContext streamContext;
 
     private volatile Set<PublishPartition> publishPartitions;
 
-    public AlertBoltOutputCollectorWrapper(OutputCollector outputCollector, Object outputLock,
+    public AlertBoltOutputCollectorWrapper(StreamOutputCollector outputCollector, Object outputLock,
                                            StreamContext streamContext) {
         this.delegate = outputCollector;
         this.outputLock = outputLock;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java
new file mode 100644
index 0000000..88ffadb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.router;
+
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
+import java.util.List;
+
+
+public interface StreamOutputCollector {
+    void emit(String streamId, PartitionedEvent partitionedEvent) throws Exception;
+
+    void emit(List<Object> tuple);
+
+    void ack(PartitionedEvent partitionedEvent);
+
+    void fail(PartitionedEvent partitionedEvent);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java
new file mode 100644
index 0000000..7b8f344
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.router.impl;
+
+import backtype.storm.task.OutputCollector;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.router.StreamOutputCollector;
+import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
+
+import java.util.Collections;
+import java.util.List;
+
+public class StormOutputCollector implements StreamOutputCollector {
+
+    private final OutputCollector outputCollector;
+    private final PartitionedEventSerializer serializer;
+
+    public StormOutputCollector(OutputCollector outputCollector, PartitionedEventSerializer serializer) {
+        this.outputCollector = outputCollector;
+        this.serializer = serializer;
+    }
+
+    public StormOutputCollector(OutputCollector outputCollector) {
+        this(outputCollector, null);
+    }
+
+    @Override
+    public void emit(String streamId, PartitionedEvent partitionedEvent) throws Exception {
+        if (this.serializer == null) {
+            outputCollector.emit(streamId, Collections.singletonList(partitionedEvent.getAnchor()), Collections.singletonList(partitionedEvent));
+        } else {
+            outputCollector.emit(streamId, Collections.singletonList(partitionedEvent.getAnchor()), Collections.singletonList(serializer.serialize(partitionedEvent)));
+        }
+    }
+
+    @Override
+    public void emit(List<Object> tuple) {
+        outputCollector.emit(tuple);
+    }
+
+    @Override
+    public void ack(PartitionedEvent partitionedEvent) {
+        outputCollector.ack(partitionedEvent.getAnchor());
+    }
+
+    @Override
+    public void fail(PartitionedEvent partitionedEvent) {
+        this.outputCollector.fail(partitionedEvent.getAnchor());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
index 77e8daa..2eb101a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
@@ -18,6 +18,7 @@
  */
 package org.apache.eagle.alert.engine.router.impl;
 
+import com.google.common.collect.Lists;
 import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
 import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
 import org.apache.eagle.alert.coordination.model.WorkSlot;
@@ -27,14 +28,8 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.router.StreamRoute;
-import org.apache.eagle.alert.engine.router.StreamRoutePartitionFactory;
-import org.apache.eagle.alert.engine.router.StreamRoutePartitioner;
-import org.apache.eagle.alert.engine.router.StreamRouteSpecListener;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
+import org.apache.eagle.alert.engine.router.*;
 import org.apache.eagle.alert.utils.StreamIdConversion;
-import backtype.storm.task.OutputCollector;
-import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,23 +42,21 @@ import java.util.*;
  */
 public class StreamRouterBoltOutputCollector implements PartitionedEventCollector, StreamRouteSpecListener {
     private static final Logger LOG = LoggerFactory.getLogger(StreamRouterBoltOutputCollector.class);
-    private final OutputCollector outputCollector;
+    private final StreamOutputCollector outputCollector;
     private final Object outputLock = new Object();
     //    private final List<String> outputStreamIds;
     private final StreamContext streamContext;
-    private final PartitionedEventSerializer serializer;
     private volatile Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap;
     private volatile Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap;
     private final String sourceId;
 
-    public StreamRouterBoltOutputCollector(String sourceId, OutputCollector outputCollector, List<String> outputStreamIds, StreamContext streamContext, PartitionedEventSerializer serializer) {
+    public StreamRouterBoltOutputCollector(String sourceId, StreamOutputCollector outputCollector, List<String> outputStreamIds, StreamContext streamContext) {
         this.sourceId = sourceId;
         this.outputCollector = outputCollector;
         this.routeSpecMap = new HashMap<>();
         this.routePartitionerMap = new HashMap<>();
         // this.outputStreamIds = outputStreamIds;
         this.streamContext = streamContext;
-        this.serializer = serializer;
     }
 
     public void emit(PartitionedEvent event) {
@@ -84,7 +77,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
                 LOG.error("Partitioner for " + routerSpecs.get(0) + " is null");
                 synchronized (outputLock) {
                     this.streamContext.counter().incr("fail_count");
-                    this.outputCollector.fail(event.getAnchor());
+                    this.outputCollector.fail(event);
                 }
                 return;
             }
@@ -106,11 +99,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug("Emitted to stream {} with message {}", targetStreamId, emittedEvent);
                             }
-                            if (this.serializer == null) {
-                                outputCollector.emit(targetStreamId, event.getAnchor(), Collections.singletonList(emittedEvent));
-                            } else {
-                                outputCollector.emit(targetStreamId, event.getAnchor(), Collections.singletonList(serializer.serialize(emittedEvent)));
-                            }
+                            outputCollector.emit(targetStreamId, event);
                             this.streamContext.counter().incr("emit_count");
                         } catch (RuntimeException ex) {
                             this.streamContext.counter().incr("fail_count");
@@ -119,13 +108,13 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
                         }
                     }
                 }
-                outputCollector.ack(event.getAnchor());
+                outputCollector.ack(event);
             }
         } catch (Exception ex) {
             LOG.error(ex.getMessage(), ex);
             synchronized (outputLock) {
                 this.streamContext.counter().incr("fail_count");
-                this.outputCollector.fail(event.getAnchor());
+                this.outputCollector.fail(event);
             }
         }
     }
@@ -141,7 +130,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
         // added StreamRouterSpec i.e. there is a new StreamPartition
         for (StreamRouterSpec spec : added) {
             if (copyRouteSpecMap.containsKey(spec.getPartition())
-                && copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
+                    && copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
                 LOG.error("Metadata calculation error: add existing StreamRouterSpec " + spec);
             } else {
                 inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds);
@@ -151,7 +140,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
         // removed StreamRouterSpec i.e. there is a deleted StreamPartition
         for (StreamRouterSpec spec : removed) {
             if (!copyRouteSpecMap.containsKey(spec.getPartition())
-                || !copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
+                    || !copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
                 LOG.error("Metadata calculation error: remove non-existing StreamRouterSpec " + spec);
             } else {
                 inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
@@ -161,7 +150,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
         // modified StreamRouterSpec, i.e. there is modified StreamPartition, for example WorkSlotQueue assignment is changed
         for (StreamRouterSpec spec : modified) {
             if (!copyRouteSpecMap.containsKey(spec.getPartition())
-                || copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
+                    || copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
                 LOG.error("Metadata calculation error: modify nonexisting StreamRouterSpec " + spec);
             } else {
                 inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
@@ -207,9 +196,9 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
         }
         for (PolicyWorkerQueue pwq : streamRouterSpec.getTargetQueue()) {
             routePartitioners.add(StreamRoutePartitionFactory.createRoutePartitioner(
-                Lists.transform(pwq.getWorkers(), WorkSlot::getBoltId),
-                sds.get(streamRouterSpec.getPartition().getStreamId()),
-                streamRouterSpec.getPartition()));
+                    Lists.transform(pwq.getWorkers(), WorkSlot::getBoltId),
+                    sds.get(streamRouterSpec.getPartition().getStreamId()),
+                    streamRouterSpec.getPartition()));
         }
         return routePartitioners;
     }
@@ -219,7 +208,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
         synchronized (outputLock) {
             this.streamContext.counter().incr("drop_count");
             if (event.getAnchor() != null) {
-                this.outputCollector.ack(event.getAnchor());
+                this.outputCollector.ack(event);
             } else {
                 throw new IllegalStateException(event.toString() + " was not acked as anchor is null");
             }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index c946fee..02bc47e 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -38,6 +38,7 @@ import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorWrap
 import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
+import org.apache.eagle.alert.engine.router.impl.StormOutputCollector;
 import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
 import org.apache.eagle.alert.engine.utils.SingletonExecutor;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
@@ -102,7 +103,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
                 pe.getEvent().setMetaVersion(specVersion);
             } else if (streamEventVersion != null && !streamEventVersion.equals(specVersion)) {
                 if (specVersion != null && streamEventVersion != null
-                    && specVersion.contains("spec_version_") && streamEventVersion.contains("spec_version_")) {
+                        && specVersion.contains("spec_version_") && streamEventVersion.contains("spec_version_")) {
                     // check if specVersion is older than stream_event_version
                     // Long timestamp_of_specVersion = Long.valueOf(specVersion.split("spec_version_")[1]);
                     // Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.split("spec_version_")[1]);
@@ -161,7 +162,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
         // instantiate output lock object
         outputLock = new Object();
         streamContext = new StreamContextImpl(config, context.registerMetric("eagle.evaluator", new MultiCountMetric(), 60), context);
-        alertOutputCollector = new AlertBoltOutputCollectorWrapper(collector, outputLock, streamContext);
+        alertOutputCollector = new AlertBoltOutputCollectorWrapper(new StormOutputCollector(collector), outputLock, streamContext);
         policyGroupEvaluator.init(streamContext, alertOutputCollector);
         metadataChangeNotifyService.registerListener(this);
         metadataChangeNotifyService.init(config, MetadataType.ALERT_BOLT);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
index 7acd7e4..29ee771 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
@@ -36,6 +36,7 @@ import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
 import org.apache.eagle.alert.engine.router.StreamRouter;
 import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
+import org.apache.eagle.alert.engine.router.impl.StormOutputCollector;
 import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector;
 import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
 import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
@@ -69,7 +70,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
     @Override
     public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService changeNotifyService, Config config, TopologyContext context) {
         streamContext = new StreamContextImpl(config, context.registerMetric("eagle.router", new MultiCountMetric(), 60), context);
-        routeCollector = new StreamRouterBoltOutputCollector(getBoltId(), collector, this.getOutputStreamIds(), streamContext, serializer);
+        routeCollector = new StreamRouterBoltOutputCollector(getBoltId(), new StormOutputCollector(collector, serializer), this.getOutputStreamIds(), streamContext);
         router.prepare(streamContext, routeCollector);
         changeNotifyService.registerListener(this);
         changeNotifyService.init(config, MetadataType.STREAM_ROUTER_BOLT);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
index f356931..4552417 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
@@ -20,6 +20,7 @@ import backtype.storm.task.IOutputCollector;
 import backtype.storm.task.OutputCollector;
 import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorThreadSafeWrapper;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.router.impl.StormOutputCollector;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -32,7 +33,7 @@ public class AlertBoltOutputCollectorThreadSafeWrapperTest {
     @Test
     public void testThreadSafeAlertBoltOutputCollector() {
         MockedStormAlertOutputCollector stormOutputCollector = new MockedStormAlertOutputCollector(null);
-        AlertBoltOutputCollectorThreadSafeWrapper alertBoltOutputCollectorWrapper = new AlertBoltOutputCollectorThreadSafeWrapper(stormOutputCollector);
+        AlertBoltOutputCollectorThreadSafeWrapper alertBoltOutputCollectorWrapper = new AlertBoltOutputCollectorThreadSafeWrapper(new StormOutputCollector(stormOutputCollector));
         alertBoltOutputCollectorWrapper.emit(create("mockAlert_1"));
         alertBoltOutputCollectorWrapper.emit(create("mockAlert_2"));
         Assert.assertEquals(0, stormOutputCollector.getCollected().size());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
index fd8ad61..704857d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
@@ -34,6 +34,7 @@ import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.router.impl.StormOutputCollector;
 import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector;
 import org.junit.Assert;
 import org.junit.Test;
@@ -53,7 +54,7 @@ public class TestStreamRouterBoltOutputCollector {
         StreamPartition partition = new StreamPartition();
         partition.setStreamId(streamId);
         partition.setType(StreamPartition.Type.GROUPBY);
-        partition.setColumns(new ArrayList<String>(){{
+        partition.setColumns(new ArrayList<String>() {{
             add("col1");
         }});
 
@@ -63,22 +64,26 @@ public class TestStreamRouterBoltOutputCollector {
 
         PolicyWorkerQueue queue1 = new PolicyWorkerQueue();
         queue1.setPartition(partition);
-        queue1.setWorkers(new ArrayList<WorkSlot>(){ {
-            add(worker1);
-        }} );
+        queue1.setWorkers(new ArrayList<WorkSlot>() {
+            {
+                add(worker1);
+            }
+        });
 
         PolicyWorkerQueue queue2 = new PolicyWorkerQueue();
         queue2.setPartition(partition);
-        queue2.setWorkers(new ArrayList<WorkSlot>(){ {
-            add(worker1);
-            add(worker2);
-        }} );
+        queue2.setWorkers(new ArrayList<WorkSlot>() {
+            {
+                add(worker1);
+                add(worker2);
+            }
+        });
 
         StreamRouterSpec spec1 = new StreamRouterSpec();
         spec1.setStreamId(streamId);
         spec1.setPartition(partition);
 
-        spec1.setTargetQueue(new ArrayList<PolicyWorkerQueue>(){{
+        spec1.setTargetQueue(new ArrayList<PolicyWorkerQueue>() {{
             add(queue1);
         }});
 
@@ -86,7 +91,7 @@ public class TestStreamRouterBoltOutputCollector {
         spec2.setStreamId(streamId);
         spec2.setPartition(partition);
 
-        spec2.setTargetQueue(new ArrayList<PolicyWorkerQueue>(){{
+        spec2.setTargetQueue(new ArrayList<PolicyWorkerQueue>() {{
             add(queue2);
         }});
 
@@ -138,7 +143,7 @@ public class TestStreamRouterBoltOutputCollector {
 
         // create two events
         StreamEvent event1 = new StreamEvent();
-        Object[] data = new Object[] {"value1"};
+        Object[] data = new Object[]{"value1"};
         event1.setData(data);
         event1.setStreamId(streamId);
         PartitionedEvent pEvent1 = new PartitionedEvent();
@@ -146,7 +151,7 @@ public class TestStreamRouterBoltOutputCollector {
         pEvent1.setPartition(partition);
 
         StreamEvent event2 = new StreamEvent();
-        Object[] data2 = new Object[] {"value3"};
+        Object[] data2 = new Object[]{"value3"};
         event2.setData(data2);
         event2.setStreamId(streamId);
         PartitionedEvent pEvent2 = new PartitionedEvent();
@@ -156,7 +161,7 @@ public class TestStreamRouterBoltOutputCollector {
         TopologyContext context = Mockito.mock(TopologyContext.class);
         when(context.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
         StreamContext streamContext = new StreamContextImpl(null, context.registerMetric("eagle.router", new MultiCountMetric(), 60), context);
-        StreamRouterBoltOutputCollector collector = new StreamRouterBoltOutputCollector("test", new OutputCollector(delegate), null, streamContext, null);
+        StreamRouterBoltOutputCollector collector = new StreamRouterBoltOutputCollector("test", new StormOutputCollector(new OutputCollector(delegate), null), null, streamContext);
 
         // add a StreamRouterSpec which has one worker
         collector.onStreamRouterSpecChange(list1, new ArrayList<>(), new ArrayList<>(), sds);