You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/12/06 02:13:28 UTC
incubator-eagle git commit: [EAGLE-810] create interface StreamOutputCollector to abstract output…
Repository: incubator-eagle
Updated Branches:
refs/heads/master 0d1dcc408 -> 976edcd86
[EAGLE-810] create interface StreamOutputCollector to abstract output\u2026
- create interface StreamOutputCollector to abstract outputcollector
https://issues.apache.org/jira/browse/EAGLE-810
Author: r7raul1984 <ta...@yhd.com>
Closes #696 from r7raul1984/EAGLE-810.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/976edcd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/976edcd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/976edcd8
Branch: refs/heads/master
Commit: 976edcd868b3b90327efe34ee088db579dfc6e89
Parents: 0d1dcc4
Author: r7raul1984 <ta...@yhd.com>
Authored: Tue Dec 6 10:13:22 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Tue Dec 6 10:13:22 2016 +0800
----------------------------------------------------------------------
...ertBoltOutputCollectorThreadSafeWrapper.java | 6 +-
.../impl/AlertBoltOutputCollectorWrapper.java | 6 +-
.../engine/router/StreamOutputCollector.java | 33 ++++++++++
.../router/impl/StormOutputCollector.java | 65 ++++++++++++++++++++
.../impl/StreamRouterBoltOutputCollector.java | 41 +++++-------
.../eagle/alert/engine/runner/AlertBolt.java | 5 +-
.../alert/engine/runner/StreamRouterBolt.java | 3 +-
...oltOutputCollectorThreadSafeWrapperTest.java | 3 +-
.../TestStreamRouterBoltOutputCollector.java | 31 ++++++----
9 files changed, 144 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
index 33d502e..185853d 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
@@ -18,7 +18,7 @@ package org.apache.eagle.alert.engine.evaluator.impl;
import org.apache.eagle.alert.engine.AlertStreamCollector;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import backtype.storm.task.OutputCollector;
+import org.apache.eagle.alert.engine.router.StreamOutputCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,14 +40,14 @@ import java.util.concurrent.atomic.AtomicLong;
* </ul>
*/
public class AlertBoltOutputCollectorThreadSafeWrapper implements AlertStreamCollector {
- private final OutputCollector delegate;
+ private final StreamOutputCollector delegate;
private final LinkedBlockingQueue<AlertStreamEvent> queue;
private static final Logger LOG = LoggerFactory.getLogger(AlertBoltOutputCollectorThreadSafeWrapper.class);
private final AtomicLong lastFlushTime = new AtomicLong(System.currentTimeMillis());
private final AutoAlertFlusher flusher;
private static final int MAX_ALERT_DELAY_SECS = 10;
- public AlertBoltOutputCollectorThreadSafeWrapper(OutputCollector outputCollector) {
+ public AlertBoltOutputCollectorThreadSafeWrapper(StreamOutputCollector outputCollector) {
this.delegate = outputCollector;
this.queue = new LinkedBlockingQueue<>();
this.flusher = new AutoAlertFlusher(this);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
index af2b9f8..3053e6e 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
@@ -25,22 +25,22 @@ import org.apache.eagle.alert.engine.AlertStreamCollector;
import org.apache.eagle.alert.engine.StreamContext;
import org.apache.eagle.alert.engine.coordinator.PublishPartition;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.router.StreamOutputCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.task.OutputCollector;
public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector {
private static final Logger LOG = LoggerFactory.getLogger(AlertBoltOutputCollectorWrapper.class);
- private final OutputCollector delegate;
+ private final StreamOutputCollector delegate;
private final Object outputLock;
private final StreamContext streamContext;
private volatile Set<PublishPartition> publishPartitions;
- public AlertBoltOutputCollectorWrapper(OutputCollector outputCollector, Object outputLock,
+ public AlertBoltOutputCollectorWrapper(StreamOutputCollector outputCollector, Object outputLock,
StreamContext streamContext) {
this.delegate = outputCollector;
this.outputLock = outputLock;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java
new file mode 100644
index 0000000..88ffadb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.router;
+
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
+import java.util.List;
+
+
+public interface StreamOutputCollector {
+ void emit(String streamId, PartitionedEvent partitionedEvent) throws Exception;
+
+ void emit(List<Object> tuple);
+
+ void ack(PartitionedEvent partitionedEvent);
+
+ void fail(PartitionedEvent partitionedEvent);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java
new file mode 100644
index 0000000..7b8f344
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.router.impl;
+
+import backtype.storm.task.OutputCollector;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.router.StreamOutputCollector;
+import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
+
+import java.util.Collections;
+import java.util.List;
+
+public class StormOutputCollector implements StreamOutputCollector {
+
+ private final OutputCollector outputCollector;
+ private final PartitionedEventSerializer serializer;
+
+ public StormOutputCollector(OutputCollector outputCollector, PartitionedEventSerializer serializer) {
+ this.outputCollector = outputCollector;
+ this.serializer = serializer;
+ }
+
+ public StormOutputCollector(OutputCollector outputCollector) {
+ this(outputCollector, null);
+ }
+
+ @Override
+ public void emit(String streamId, PartitionedEvent partitionedEvent) throws Exception {
+ if (this.serializer == null) {
+ outputCollector.emit(streamId, Collections.singletonList(partitionedEvent.getAnchor()), Collections.singletonList(partitionedEvent));
+ } else {
+ outputCollector.emit(streamId, Collections.singletonList(partitionedEvent.getAnchor()), Collections.singletonList(serializer.serialize(partitionedEvent)));
+ }
+ }
+
+ @Override
+ public void emit(List<Object> tuple) {
+ outputCollector.emit(tuple);
+ }
+
+ @Override
+ public void ack(PartitionedEvent partitionedEvent) {
+ outputCollector.ack(partitionedEvent.getAnchor());
+ }
+
+ @Override
+ public void fail(PartitionedEvent partitionedEvent) {
+ this.outputCollector.fail(partitionedEvent.getAnchor());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
index 77e8daa..2eb101a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
@@ -18,6 +18,7 @@
*/
package org.apache.eagle.alert.engine.router.impl;
+import com.google.common.collect.Lists;
import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
import org.apache.eagle.alert.coordination.model.WorkSlot;
@@ -27,14 +28,8 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.router.StreamRoute;
-import org.apache.eagle.alert.engine.router.StreamRoutePartitionFactory;
-import org.apache.eagle.alert.engine.router.StreamRoutePartitioner;
-import org.apache.eagle.alert.engine.router.StreamRouteSpecListener;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
+import org.apache.eagle.alert.engine.router.*;
import org.apache.eagle.alert.utils.StreamIdConversion;
-import backtype.storm.task.OutputCollector;
-import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,23 +42,21 @@ import java.util.*;
*/
public class StreamRouterBoltOutputCollector implements PartitionedEventCollector, StreamRouteSpecListener {
private static final Logger LOG = LoggerFactory.getLogger(StreamRouterBoltOutputCollector.class);
- private final OutputCollector outputCollector;
+ private final StreamOutputCollector outputCollector;
private final Object outputLock = new Object();
// private final List<String> outputStreamIds;
private final StreamContext streamContext;
- private final PartitionedEventSerializer serializer;
private volatile Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap;
private volatile Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap;
private final String sourceId;
- public StreamRouterBoltOutputCollector(String sourceId, OutputCollector outputCollector, List<String> outputStreamIds, StreamContext streamContext, PartitionedEventSerializer serializer) {
+ public StreamRouterBoltOutputCollector(String sourceId, StreamOutputCollector outputCollector, List<String> outputStreamIds, StreamContext streamContext) {
this.sourceId = sourceId;
this.outputCollector = outputCollector;
this.routeSpecMap = new HashMap<>();
this.routePartitionerMap = new HashMap<>();
// this.outputStreamIds = outputStreamIds;
this.streamContext = streamContext;
- this.serializer = serializer;
}
public void emit(PartitionedEvent event) {
@@ -84,7 +77,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
LOG.error("Partitioner for " + routerSpecs.get(0) + " is null");
synchronized (outputLock) {
this.streamContext.counter().incr("fail_count");
- this.outputCollector.fail(event.getAnchor());
+ this.outputCollector.fail(event);
}
return;
}
@@ -106,11 +99,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
if (LOG.isDebugEnabled()) {
LOG.debug("Emitted to stream {} with message {}", targetStreamId, emittedEvent);
}
- if (this.serializer == null) {
- outputCollector.emit(targetStreamId, event.getAnchor(), Collections.singletonList(emittedEvent));
- } else {
- outputCollector.emit(targetStreamId, event.getAnchor(), Collections.singletonList(serializer.serialize(emittedEvent)));
- }
+ outputCollector.emit(targetStreamId, event);
this.streamContext.counter().incr("emit_count");
} catch (RuntimeException ex) {
this.streamContext.counter().incr("fail_count");
@@ -119,13 +108,13 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
}
}
}
- outputCollector.ack(event.getAnchor());
+ outputCollector.ack(event);
}
} catch (Exception ex) {
LOG.error(ex.getMessage(), ex);
synchronized (outputLock) {
this.streamContext.counter().incr("fail_count");
- this.outputCollector.fail(event.getAnchor());
+ this.outputCollector.fail(event);
}
}
}
@@ -141,7 +130,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
// added StreamRouterSpec i.e. there is a new StreamPartition
for (StreamRouterSpec spec : added) {
if (copyRouteSpecMap.containsKey(spec.getPartition())
- && copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
+ && copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
LOG.error("Metadata calculation error: add existing StreamRouterSpec " + spec);
} else {
inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds);
@@ -151,7 +140,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
// removed StreamRouterSpec i.e. there is a deleted StreamPartition
for (StreamRouterSpec spec : removed) {
if (!copyRouteSpecMap.containsKey(spec.getPartition())
- || !copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
+ || !copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
LOG.error("Metadata calculation error: remove non-existing StreamRouterSpec " + spec);
} else {
inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
@@ -161,7 +150,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
// modified StreamRouterSpec, i.e. there is modified StreamPartition, for example WorkSlotQueue assignment is changed
for (StreamRouterSpec spec : modified) {
if (!copyRouteSpecMap.containsKey(spec.getPartition())
- || copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
+ || copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
LOG.error("Metadata calculation error: modify nonexisting StreamRouterSpec " + spec);
} else {
inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
@@ -207,9 +196,9 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
}
for (PolicyWorkerQueue pwq : streamRouterSpec.getTargetQueue()) {
routePartitioners.add(StreamRoutePartitionFactory.createRoutePartitioner(
- Lists.transform(pwq.getWorkers(), WorkSlot::getBoltId),
- sds.get(streamRouterSpec.getPartition().getStreamId()),
- streamRouterSpec.getPartition()));
+ Lists.transform(pwq.getWorkers(), WorkSlot::getBoltId),
+ sds.get(streamRouterSpec.getPartition().getStreamId()),
+ streamRouterSpec.getPartition()));
}
return routePartitioners;
}
@@ -219,7 +208,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto
synchronized (outputLock) {
this.streamContext.counter().incr("drop_count");
if (event.getAnchor() != null) {
- this.outputCollector.ack(event.getAnchor());
+ this.outputCollector.ack(event);
} else {
throw new IllegalStateException(event.toString() + " was not acked as anchor is null");
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index c946fee..02bc47e 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -38,6 +38,7 @@ import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorWrap
import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
+import org.apache.eagle.alert.engine.router.impl.StormOutputCollector;
import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
import org.apache.eagle.alert.engine.utils.SingletonExecutor;
import org.apache.eagle.alert.service.IMetadataServiceClient;
@@ -102,7 +103,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
pe.getEvent().setMetaVersion(specVersion);
} else if (streamEventVersion != null && !streamEventVersion.equals(specVersion)) {
if (specVersion != null && streamEventVersion != null
- && specVersion.contains("spec_version_") && streamEventVersion.contains("spec_version_")) {
+ && specVersion.contains("spec_version_") && streamEventVersion.contains("spec_version_")) {
// check if specVersion is older than stream_event_version
// Long timestamp_of_specVersion = Long.valueOf(specVersion.split("spec_version_")[1]);
// Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.split("spec_version_")[1]);
@@ -161,7 +162,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
// instantiate output lock object
outputLock = new Object();
streamContext = new StreamContextImpl(config, context.registerMetric("eagle.evaluator", new MultiCountMetric(), 60), context);
- alertOutputCollector = new AlertBoltOutputCollectorWrapper(collector, outputLock, streamContext);
+ alertOutputCollector = new AlertBoltOutputCollectorWrapper(new StormOutputCollector(collector), outputLock, streamContext);
policyGroupEvaluator.init(streamContext, alertOutputCollector);
metadataChangeNotifyService.registerListener(this);
metadataChangeNotifyService.init(config, MetadataType.ALERT_BOLT);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
index 7acd7e4..29ee771 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
@@ -36,6 +36,7 @@ import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
import org.apache.eagle.alert.engine.router.StreamRouter;
import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
+import org.apache.eagle.alert.engine.router.impl.StormOutputCollector;
import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector;
import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl;
import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
@@ -69,7 +70,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
@Override
public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService changeNotifyService, Config config, TopologyContext context) {
streamContext = new StreamContextImpl(config, context.registerMetric("eagle.router", new MultiCountMetric(), 60), context);
- routeCollector = new StreamRouterBoltOutputCollector(getBoltId(), collector, this.getOutputStreamIds(), streamContext, serializer);
+ routeCollector = new StreamRouterBoltOutputCollector(getBoltId(), new StormOutputCollector(collector, serializer), this.getOutputStreamIds(), streamContext);
router.prepare(streamContext, routeCollector);
changeNotifyService.registerListener(this);
changeNotifyService.init(config, MetadataType.STREAM_ROUTER_BOLT);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
index f356931..4552417 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
@@ -20,6 +20,7 @@ import backtype.storm.task.IOutputCollector;
import backtype.storm.task.OutputCollector;
import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorThreadSafeWrapper;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.router.impl.StormOutputCollector;
import org.junit.Assert;
import org.junit.Test;
@@ -32,7 +33,7 @@ public class AlertBoltOutputCollectorThreadSafeWrapperTest {
@Test
public void testThreadSafeAlertBoltOutputCollector() {
MockedStormAlertOutputCollector stormOutputCollector = new MockedStormAlertOutputCollector(null);
- AlertBoltOutputCollectorThreadSafeWrapper alertBoltOutputCollectorWrapper = new AlertBoltOutputCollectorThreadSafeWrapper(stormOutputCollector);
+ AlertBoltOutputCollectorThreadSafeWrapper alertBoltOutputCollectorWrapper = new AlertBoltOutputCollectorThreadSafeWrapper(new StormOutputCollector(stormOutputCollector));
alertBoltOutputCollectorWrapper.emit(create("mockAlert_1"));
alertBoltOutputCollectorWrapper.emit(create("mockAlert_2"));
Assert.assertEquals(0, stormOutputCollector.getCollected().size());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
index fd8ad61..704857d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java
@@ -34,6 +34,7 @@ import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.router.impl.StormOutputCollector;
import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector;
import org.junit.Assert;
import org.junit.Test;
@@ -53,7 +54,7 @@ public class TestStreamRouterBoltOutputCollector {
StreamPartition partition = new StreamPartition();
partition.setStreamId(streamId);
partition.setType(StreamPartition.Type.GROUPBY);
- partition.setColumns(new ArrayList<String>(){{
+ partition.setColumns(new ArrayList<String>() {{
add("col1");
}});
@@ -63,22 +64,26 @@ public class TestStreamRouterBoltOutputCollector {
PolicyWorkerQueue queue1 = new PolicyWorkerQueue();
queue1.setPartition(partition);
- queue1.setWorkers(new ArrayList<WorkSlot>(){ {
- add(worker1);
- }} );
+ queue1.setWorkers(new ArrayList<WorkSlot>() {
+ {
+ add(worker1);
+ }
+ });
PolicyWorkerQueue queue2 = new PolicyWorkerQueue();
queue2.setPartition(partition);
- queue2.setWorkers(new ArrayList<WorkSlot>(){ {
- add(worker1);
- add(worker2);
- }} );
+ queue2.setWorkers(new ArrayList<WorkSlot>() {
+ {
+ add(worker1);
+ add(worker2);
+ }
+ });
StreamRouterSpec spec1 = new StreamRouterSpec();
spec1.setStreamId(streamId);
spec1.setPartition(partition);
- spec1.setTargetQueue(new ArrayList<PolicyWorkerQueue>(){{
+ spec1.setTargetQueue(new ArrayList<PolicyWorkerQueue>() {{
add(queue1);
}});
@@ -86,7 +91,7 @@ public class TestStreamRouterBoltOutputCollector {
spec2.setStreamId(streamId);
spec2.setPartition(partition);
- spec2.setTargetQueue(new ArrayList<PolicyWorkerQueue>(){{
+ spec2.setTargetQueue(new ArrayList<PolicyWorkerQueue>() {{
add(queue2);
}});
@@ -138,7 +143,7 @@ public class TestStreamRouterBoltOutputCollector {
// create two events
StreamEvent event1 = new StreamEvent();
- Object[] data = new Object[] {"value1"};
+ Object[] data = new Object[]{"value1"};
event1.setData(data);
event1.setStreamId(streamId);
PartitionedEvent pEvent1 = new PartitionedEvent();
@@ -146,7 +151,7 @@ public class TestStreamRouterBoltOutputCollector {
pEvent1.setPartition(partition);
StreamEvent event2 = new StreamEvent();
- Object[] data2 = new Object[] {"value3"};
+ Object[] data2 = new Object[]{"value3"};
event2.setData(data2);
event2.setStreamId(streamId);
PartitionedEvent pEvent2 = new PartitionedEvent();
@@ -156,7 +161,7 @@ public class TestStreamRouterBoltOutputCollector {
TopologyContext context = Mockito.mock(TopologyContext.class);
when(context.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric());
StreamContext streamContext = new StreamContextImpl(null, context.registerMetric("eagle.router", new MultiCountMetric(), 60), context);
- StreamRouterBoltOutputCollector collector = new StreamRouterBoltOutputCollector("test", new OutputCollector(delegate), null, streamContext, null);
+ StreamRouterBoltOutputCollector collector = new StreamRouterBoltOutputCollector("test", new StormOutputCollector(new OutputCollector(delegate), null), null, streamContext);
// add a StreamRouterSpec which has one worker
collector.onStreamRouterSpecChange(list1, new ArrayList<>(), new ArrayList<>(), sds);