You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:31 UTC
[23/84] [partial] eagle git commit: Clean repo for eagle site
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
deleted file mode 100644
index 87a067f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.template;
-
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.engine.coordinator.AlertDefinition;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.common.DateTimeUtil;
-import org.apache.velocity.Template;
-import org.apache.velocity.VelocityContext;
-import org.apache.velocity.app.Velocity;
-import org.apache.velocity.app.VelocityEngine;
-import org.apache.velocity.runtime.RuntimeConstants;
-import org.apache.velocity.runtime.resource.loader.StringResourceLoader;
-import org.apache.velocity.runtime.resource.util.StringResourceRepository;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-public class VelocityAlertTemplateEngine implements AlertTemplateEngine {
- private static final String ALERT_BODY_TPL_PREFIX = "AlertBodyTemplate";
- private static final String ALERT_SUBJECT_TPL_PREFIX = "AlertSubjectTemplate";
- private static final Logger LOG = LoggerFactory.getLogger(VelocityAlertTemplateEngine.class);
- private StringResourceRepository stringResourceRepository;
- private Map<String, PolicyDefinition> policyDefinitionRepository;
- private VelocityEngine engine;
-
-
- @Override
- public void init(Config config) {
- engine = new VelocityEngine();
- engine.setProperty(RuntimeConstants.RUNTIME_LOG_LOGSYSTEM_CLASS, "org.apache.velocity.runtime.log.Log4JLogChute");
- engine.setProperty("runtime.log.logsystem.log4j.logger", LOG.getName());
- engine.setProperty(Velocity.RESOURCE_LOADER, "string");
- engine.addProperty("string.resource.loader.class", StringResourceLoader.class.getName());
- engine.addProperty("string.resource.loader.repository.static", "false");
- engine.init();
-
- stringResourceRepository = (StringResourceRepository) engine.getApplicationAttribute(StringResourceLoader.REPOSITORY_NAME_DEFAULT);
- policyDefinitionRepository = new HashMap<>();
- }
-
- private String getAlertBodyTemplateName(String policyId) {
- return String.format("%s:%s", ALERT_BODY_TPL_PREFIX, policyId);
- }
-
- private String getAlertSubjectTemplateName(String policyId) {
- return String.format("%s:%s", ALERT_SUBJECT_TPL_PREFIX, policyId);
- }
-
- @Override
- public synchronized void register(PolicyDefinition policyDefinition) {
- LOG.info("Registering {}", policyDefinition.getName());
- Preconditions.checkNotNull(policyDefinition.getName(), "policyId is null");
- AlertDefinition alertDefinition = policyDefinition.getAlertDefinition();
- if (alertDefinition == null) {
- LOG.warn("Subject template of policy {} is null, using policy name by default");
- stringResourceRepository.putStringResource(getAlertSubjectTemplateName(policyDefinition.getName()), policyDefinition.getName());
-
- LOG.warn("Body template of policy {} is null, using $ALERT_EVENT by default");
- String defaultAlertBodyTmpl = String.format("Message: $%s (Auto-generated alert message as template not defined in policy %s)",
- AlertContextFields.ALERT_EVENT, policyDefinition.getName());
- stringResourceRepository.putStringResource(getAlertBodyTemplateName(policyDefinition.getName()), defaultAlertBodyTmpl);
- } else if (alertDefinition.getTemplateType().equals(AlertDefinition.TemplateType.TEXT)) {
- if (alertDefinition.getSubject() != null) {
- stringResourceRepository.putStringResource(getAlertSubjectTemplateName(policyDefinition.getName()), alertDefinition.getSubject());
- } else {
- LOG.warn("Subject template of policy {} is null, using policy name by default");
- stringResourceRepository.putStringResource(getAlertSubjectTemplateName(policyDefinition.getName()), policyDefinition.getName());
- }
- if (alertDefinition.getBody() != null) {
- stringResourceRepository.putStringResource(getAlertBodyTemplateName(policyDefinition.getName()), alertDefinition.getBody());
- } else {
- LOG.warn("Body template of policy {} is null, using ALERT_EVENT by default");
- stringResourceRepository.putStringResource(getAlertBodyTemplateName(policyDefinition.getName()), "$" + AlertContextFields.ALERT_EVENT);
- }
- } else {
- throw new IllegalArgumentException("Unsupported alert template type " + alertDefinition.getTemplateType());
- }
- policyDefinitionRepository.put(policyDefinition.getName(), policyDefinition);
- }
-
- @Override
- public synchronized void unregister(String policyId) {
- LOG.info("Unregistering {}", policyId);
- stringResourceRepository.removeStringResource(getAlertBodyTemplateName(policyId));
- stringResourceRepository.removeStringResource(getAlertSubjectTemplateName(policyId));
- policyDefinitionRepository.remove(policyId);
- }
-
- @Override
- public synchronized AlertStreamEvent filter(AlertStreamEvent event) {
- Preconditions.checkArgument(this.policyDefinitionRepository.containsKey(event.getPolicyId()), "Unknown policyId " + event.getPolicyId());
- PolicyDefinition policyDefinition = this.policyDefinitionRepository.get(event.getPolicyId());
- StringWriter bodyWriter = new StringWriter();
- StringWriter subjectWriter = new StringWriter();
- try {
- VelocityContext alertContext = buildAlertContext(policyDefinition, event);
- Template template = engine.getTemplate(getAlertBodyTemplateName(event.getPolicyId()));
- template.merge(alertContext, bodyWriter);
- event.setBody(bodyWriter.toString());
-
- template = engine.getTemplate(getAlertSubjectTemplateName(event.getPolicyId()));
- template.merge(alertContext, subjectWriter);
- event.setSubject(subjectWriter.toString());
- } finally {
- try {
- bodyWriter.close();
- } catch (IOException e) {
- LOG.warn(e.getMessage(), e);
- }
- try {
- subjectWriter.close();
- } catch (IOException e) {
- LOG.warn(e.getMessage(), e);
- }
- }
- return event;
- }
-
- @Override
- public synchronized Collection<PolicyDefinition> getPolicies() {
- return policyDefinitionRepository.values();
- }
-
- private static VelocityContext buildAlertContext(PolicyDefinition policyDefinition, AlertStreamEvent event) {
- VelocityContext context = new VelocityContext();
- context.put(AlertContextFields.SITE_ID, event.getSiteId());
- context.put(AlertContextFields.STREAM_ID, event.getStreamId());
- context.put(AlertContextFields.ALERT_ID, event.getAlertId());
- context.put(AlertContextFields.CREATED_BY, event.getCreatedBy());
- context.put(AlertContextFields.CREATED_TIMESTAMP, event.getCreatedTime());
- context.put(AlertContextFields.CREATED_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()));
- context.put(AlertContextFields.ALERT_TIMESTAMP, event.getTimestamp());
- context.put(AlertContextFields.ALERT_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getTimestamp()));
- context.put(AlertContextFields.ALERT_SCHEMA, event.getSchema());
- context.put(AlertContextFields.ALERT_EVENT, event);
-
- context.put(AlertContextFields.POLICY_ID, policyDefinition.getName());
- context.put(AlertContextFields.POLICY_DESC, policyDefinition.getDescription());
- context.put(AlertContextFields.POLICY_TYPE, policyDefinition.getDefinition().getType());
- context.put(AlertContextFields.POLICY_DEFINITION, policyDefinition.getDefinition().getValue());
- context.put(AlertContextFields.POLICY_HANDLER, policyDefinition.getDefinition().getHandlerClass());
-
- for (Map.Entry<String, Object> entry : event.getDataMap().entrySet()) {
- context.put(entry.getKey(), entry.getValue());
- }
- return context;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java
deleted file mode 100644
index a824a0d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.publisher.template;
-
-import org.apache.velocity.Template;
-import org.apache.velocity.VelocityContext;
-import org.apache.velocity.app.Velocity;
-import org.apache.velocity.app.VelocityEngine;
-import org.apache.velocity.exception.MethodInvocationException;
-import org.apache.velocity.exception.ParseErrorException;
-import org.apache.velocity.runtime.RuntimeConstants;
-import org.apache.velocity.runtime.parser.node.ASTReference;
-import org.apache.velocity.runtime.parser.node.ASTprocess;
-import org.apache.velocity.runtime.resource.loader.StringResourceLoader;
-import org.apache.velocity.runtime.resource.util.StringResourceRepository;
-import org.apache.velocity.runtime.visitor.NodeViewMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class VelocityTemplateParser {
- private static final Logger LOG = LoggerFactory.getLogger(VelocityTemplateParser.class);
- private static final String TEMPLATE_NAME = "template";
- private final Template template;
- private final ParserNodeVisitor visitor;
-
- public VelocityTemplateParser(String templateString) throws ParseErrorException {
- VelocityEngine engine = new VelocityEngine();
- engine.setProperty(RuntimeConstants.RUNTIME_LOG_LOGSYSTEM_CLASS, "org.apache.velocity.runtime.log.Log4JLogChute");
- engine.setProperty("runtime.log.logsystem.log4j.logger", LOG.getName());
- engine.setProperty(Velocity.RESOURCE_LOADER, "string");
- engine.addProperty("string.resource.loader.class", StringResourceLoader.class.getName());
- engine.addProperty("string.resource.loader.repository.static", "false");
- engine.addProperty("runtime.references.strict", "true");
- engine.init();
- StringResourceRepository resourceRepository = (StringResourceRepository) engine.getApplicationAttribute(StringResourceLoader.REPOSITORY_NAME_DEFAULT);
- resourceRepository.putStringResource(TEMPLATE_NAME, templateString);
- template = engine.getTemplate(TEMPLATE_NAME);
- ASTprocess data = (ASTprocess) template.getData();
- visitor = new ParserNodeVisitor();
- data.jjtAccept(visitor, null);
- }
-
- public List<String> getReferenceNames() {
- return this.visitor.getReferenceNames();
- }
-
- public Template getTemplate() {
- return template;
- }
-
- /**
- * @throws MethodInvocationException if required variable is missing in context.
- */
- public void validateContext(Map<String, Object> context) throws MethodInvocationException {
- VelocityContext velocityContext = new VelocityContext();
- for (Map.Entry<String, Object> entry : context.entrySet()) {
- velocityContext.put(entry.getKey(), entry.getValue());
- }
- template.merge(velocityContext, new StringWriter());
- }
-
- private class ParserNodeVisitor extends NodeViewMode {
- private List<String> referenceNames = new ArrayList<>();
-
- @Override
- public Object visit(ASTReference node, Object data) {
- referenceNames.add(node.getRootString());
- return super.visit(node, data);
- }
-
- public List<String> getReferenceNames() {
- return this.referenceNames;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
deleted file mode 100644
index e1f3e9c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
- * Since 5/2/16.
- */
-public interface AlertBoltSpecListener {
- void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition> sds);
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
deleted file mode 100644
index 598ce18..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-import java.util.Map;
-
-/**
- * Since 5/3/16.
- */
-public interface SpoutSpecListener {
- void onSpoutSpecChange(SpoutSpec spec, Map<String, StreamDefinition> sds);
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java
deleted file mode 100644
index 88ffadb..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-import java.util.List;
-
-
-public interface StreamOutputCollector {
- void emit(String streamId, PartitionedEvent partitionedEvent) throws Exception;
-
- void emit(List<Object> tuple);
-
- void ack(PartitionedEvent partitionedEvent);
-
- void fail(PartitionedEvent partitionedEvent);
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
deleted file mode 100644
index 049e852..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import java.io.Serializable;
-
-/**
- * <b></b>
- * 1. Group by SingleStream[stream_1.col1]
- *
- * <p>Shuffle(stream_1,[col1])</p>
- *
- * <b></b>
- * 2. Group by SingleStream[stream_1.col1,stream_1.col2]
- *
- * <p>Shuffle(stream_1,[col1,col2])</p>
- *
- * <b></b>
- * 3. Group by JoinedStream[stream_1.col1,stream_1.col2,stream_2.col3]
- * <p>Shuffle(stream_1.col1,stream_1.col2) + Global(stream_2.col3)</p>
- */
-public class StreamRoute implements Serializable {
- private static final long serialVersionUID = 4649184902196034940L;
-
- private String targetComponentId;
- private int partitionKey;
- private String partitionType;
-
- public String getTargetComponentId() {
- return targetComponentId;
- }
-
- public void setTargetComponentId(String targetComponentId) {
- this.targetComponentId = targetComponentId;
- }
-
- public StreamRoute(String targetComponentId, int partitionKey, StreamPartition.Type type) {
- this.setTargetComponentId(targetComponentId);
- this.setPartitionKey(partitionKey);
- this.setPartitionType(type);
- }
-
- public int getPartitionKey() {
- return partitionKey;
- }
-
- public void setPartitionKey(int partitionKey) {
- this.partitionKey = partitionKey;
- }
-
- public StreamPartition.Type getPartitionType() {
- return StreamPartition.Type.valueOf(partitionType);
- }
-
- public void setPartitionType(StreamPartition.Type partitionType) {
- this.partitionType = partitionType.name();
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(partitionKey).append(partitionType).append(targetComponentId).build();
- }
-
- @Override
- public String toString() {
- return String.format("Route[target=%s, key=%s, type=%s]", this.targetComponentId, this.partitionKey, this.partitionType);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
deleted file mode 100644
index 0d397e4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.router.impl.BasicStreamRoutePartitioner;
-
-import java.util.List;
-
-public class StreamRoutePartitionFactory {
- /**
- * TODO: Decouple different StreamRoutePartitioner implementation from BasicStreamRoutePartitioner.
- *
- * @param outputComponentIds
- * @param streamDefinition
- * @param partition
- * @return
- */
- public static StreamRoutePartitioner createRoutePartitioner(List<String> outputComponentIds, StreamDefinition streamDefinition, StreamPartition partition) {
- return new BasicStreamRoutePartitioner(outputComponentIds, streamDefinition, partition);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
deleted file mode 100644
index 5b5632d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-import java.util.List;
-
-public interface StreamRoutePartitioner {
- List<StreamRoute> partition(StreamEvent event);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
deleted file mode 100644
index dfd2cc4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-import java.util.Collection;
-import java.util.Map;
-
-public interface StreamRouteSpecListener {
- void onStreamRouterSpecChange(Collection<StreamRouterSpec> added,
- Collection<StreamRouterSpec> removed,
- Collection<StreamRouterSpec> modified,
- Map<String, StreamDefinition> sds);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
deleted file mode 100644
index a9efc97..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-import java.io.Serializable;
-
-public interface StreamRouter extends StreamSortSpecListener, Serializable {
- void prepare(StreamContext context, PartitionedEventCollector outputCollector);
-
- void nextEvent(PartitionedEvent event);
-
- String getName();
-
- void close();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
deleted file mode 100644
index 0016fc0..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-import java.util.Map;
-
-/**
- * Listen to change on StreamRouterBoltSpec.
- * @since 5/1/16.
- */
-public interface StreamRouterBoltSpecListener {
- void onStreamRouteBoltSpecChange(RouterSpec spec, Map<String, StreamDefinition> sds);
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
deleted file mode 100644
index 613ab7f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClockListener;
-
-public interface StreamSortHandler extends StreamTimeClockListener {
-
- void prepare(String streamId, StreamSortSpec streamSortSpecSpec, PartitionedEventCollector outputCollector);
-
- /**
- * @param event StreamEvent.
- */
- void nextEvent(PartitionedEvent event);
-
- void close();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
deleted file mode 100644
index 087a46f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-
-import java.util.Map;
-
-public interface StreamSortSpecListener {
- void onStreamSortSpecChange(Map<StreamPartition, StreamSortSpec> added,
- Map<StreamPartition, StreamSortSpec> removed,
- Map<StreamPartition, StreamSortSpec> changed);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
deleted file mode 100644
index a97e1da..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router.impl;
-
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.router.StreamRoute;
-import org.apache.eagle.alert.engine.router.StreamRoutePartitioner;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-public class BasicStreamRoutePartitioner implements StreamRoutePartitioner {
- private final List<String> outputComponentIds;
- private final StreamDefinition streamDefinition;
- private final StreamPartition streamPartition;
-
- public BasicStreamRoutePartitioner(List<String> outputComponentIds, StreamDefinition streamDefinition, StreamPartition partition) {
- this.outputComponentIds = outputComponentIds;
- this.streamDefinition = streamDefinition;
- this.streamPartition = partition;
- }
-
- @Override
- public List<StreamRoute> partition(StreamEvent event) {
- switch (this.streamPartition.getType()) {
- case GLOBAL:
- return routeToAll(event);
- case GROUPBY:
- return routeByGroupByKey(event);
- default:
- return routeByShuffle(event);
- }
- }
-
- protected List<StreamRoute> routeByGroupByKey(StreamEvent event) {
- int partitionKey = new HashCodeBuilder().append(event.getData(streamDefinition, this.streamPartition.getColumns())).build();
- String selectedOutputStream = outputComponentIds.get(Math.abs(partitionKey) % this.outputComponentIds.size());
- return Collections.singletonList(new StreamRoute(selectedOutputStream, partitionKey, StreamPartition.Type.GROUPBY));
- }
-
- protected List<StreamRoute> routeByShuffle(StreamEvent event) {
- long random = System.currentTimeMillis();
- int hash = Math.abs((int) random);
- return Arrays.asList(new StreamRoute(outputComponentIds.get(hash % outputComponentIds.size()), -1, StreamPartition.Type.SHUFFLE));
- }
-
- protected List<StreamRoute> routeToAll(StreamEvent event) {
- if (globalRoutingKeys != null) {
- globalRoutingKeys = new ArrayList<>();
- for (String targetId : outputComponentIds) {
- globalRoutingKeys.add(new StreamRoute(targetId, -1, StreamPartition.Type.GLOBAL));
- }
- }
- return globalRoutingKeys;
- }
-
- private List<StreamRoute> globalRoutingKeys = null;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
deleted file mode 100644
index 5c10675..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router.impl;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.trident.partition.GlobalGrouping;
-
-import java.util.*;
-
-public class RoutePhysicalGrouping implements CustomStreamGrouping {
- private static final long serialVersionUID = -511915083994148362L;
- private static final Logger LOG = LoggerFactory.getLogger(RoutePhysicalGrouping.class);
- private List<Integer> outdegreeTasks;
- private ShuffleGrouping shuffleGroupingDelegate;
- private GlobalGrouping globalGroupingDelegate;
- private Map<String, Integer> connectedTargetIds;
-
- @Override
- public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
- this.outdegreeTasks = new ArrayList<>(targetTasks);
- shuffleGroupingDelegate = new ShuffleGrouping();
- shuffleGroupingDelegate.prepare(context, stream, targetTasks);
- globalGroupingDelegate = new GlobalGrouping();
- globalGroupingDelegate.prepare(context, stream, targetTasks);
- connectedTargetIds = new HashMap<>();
- for (Integer targetId : targetTasks) {
- String targetComponentId = context.getComponentId(targetId);
- connectedTargetIds.put(targetComponentId, targetId);
- }
- LOG.info("OutDegree components: [{}]", StringUtils.join(connectedTargetIds.values(), ","));
- }
-
- @Override
- public List<Integer> chooseTasks(int taskId, List<Object> values) {
- Object routingKeyObj = values.get(0);
- if (routingKeyObj != null) {
- PartitionedEvent partitionedEvent = (PartitionedEvent) routingKeyObj;
- if (partitionedEvent.getPartition().getType() == StreamPartition.Type.GLOBAL) {
- return globalGroupingDelegate.chooseTasks(taskId, values);
- } else if (partitionedEvent.getPartition().getType() == StreamPartition.Type.GROUPBY) {
- return Collections.singletonList(outdegreeTasks.get((int) (partitionedEvent.getPartitionKey() % this.outdegreeTasks.size())));
- }
- // Shuffle by defaults
- return shuffleGroupingDelegate.chooseTasks(taskId, values);
- }
-
- LOG.warn("Illegal null StreamRoute, throw event");
- return Collections.emptyList();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
deleted file mode 100644
index 752c742..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router.impl;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * NOTE: This is copy from storm 1.0.0 code. DON'T modify it.
- *
- * @since May 4, 2016
- */
-public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
- private static final long serialVersionUID = 5035497345182141765L;
- private Random random;
- private ArrayList<List<Integer>> choices;
- private AtomicInteger current;
-
- @Override
- public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
- random = new Random();
- choices = new ArrayList<List<Integer>>(targetTasks.size());
- for (Integer i : targetTasks) {
- choices.add(Arrays.asList(i));
- }
- Collections.shuffle(choices, random);
- current = new AtomicInteger(0);
- }
-
- @Override
- public List<Integer> chooseTasks(int taskId, List<Object> values) {
- int rightNow;
- int size = choices.size();
- while (true) {
- rightNow = current.incrementAndGet();
- if (rightNow < size) {
- return choices.get(rightNow);
- } else if (rightNow == size) {
- current.set(0);
- //This should be thread safe so long as ArrayList does not have any internal state that can be messed up by multi-treaded access.
- Collections.shuffle(choices, random);
- return choices.get(0);
- }
- //race condition with another thread, and we lost
- // try again
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java
deleted file mode 100644
index 7b8f344..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.router.impl;
-
-import backtype.storm.task.OutputCollector;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.router.StreamOutputCollector;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
-
-import java.util.Collections;
-import java.util.List;
-
-public class StormOutputCollector implements StreamOutputCollector {
-
- private final OutputCollector outputCollector;
- private final PartitionedEventSerializer serializer;
-
- public StormOutputCollector(OutputCollector outputCollector, PartitionedEventSerializer serializer) {
- this.outputCollector = outputCollector;
- this.serializer = serializer;
- }
-
- public StormOutputCollector(OutputCollector outputCollector) {
- this(outputCollector, null);
- }
-
- @Override
- public void emit(String streamId, PartitionedEvent partitionedEvent) throws Exception {
- if (this.serializer == null) {
- outputCollector.emit(streamId, Collections.singletonList(partitionedEvent.getAnchor()), Collections.singletonList(partitionedEvent));
- } else {
- outputCollector.emit(streamId, Collections.singletonList(partitionedEvent.getAnchor()), Collections.singletonList(serializer.serialize(partitionedEvent)));
- }
- }
-
- @Override
- public void emit(List<Object> tuple) {
- outputCollector.emit(tuple);
- }
-
- @Override
- public void ack(PartitionedEvent partitionedEvent) {
- outputCollector.ack(partitionedEvent.getAnchor());
- }
-
- @Override
- public void fail(PartitionedEvent partitionedEvent) {
- this.outputCollector.fail(partitionedEvent.getAnchor());
- }
-}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
deleted file mode 100644
index 2eb101a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.router.impl;
-
-import com.google.common.collect.Lists;
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.router.*;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * After sorting, one stream's message will be routed based on its StreamPartition
- * One stream may have multiple StreamPartitions based on how this stream is grouped by.
- * TODO: Add metric statistics
- */
-public class StreamRouterBoltOutputCollector implements PartitionedEventCollector, StreamRouteSpecListener {
- private static final Logger LOG = LoggerFactory.getLogger(StreamRouterBoltOutputCollector.class);
- private final StreamOutputCollector outputCollector;
- private final Object outputLock = new Object();
- // private final List<String> outputStreamIds;
- private final StreamContext streamContext;
- private volatile Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap;
- private volatile Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap;
- private final String sourceId;
-
- public StreamRouterBoltOutputCollector(String sourceId, StreamOutputCollector outputCollector, List<String> outputStreamIds, StreamContext streamContext) {
- this.sourceId = sourceId;
- this.outputCollector = outputCollector;
- this.routeSpecMap = new HashMap<>();
- this.routePartitionerMap = new HashMap<>();
- // this.outputStreamIds = outputStreamIds;
- this.streamContext = streamContext;
- }
-
- public void emit(PartitionedEvent event) {
- try {
- this.streamContext.counter().incr("send_count");
- StreamPartition partition = event.getPartition();
- List<StreamRouterSpec> routerSpecs = routeSpecMap.get(partition);
- if (routerSpecs == null || routerSpecs.size() <= 0) {
- if (LOG.isDebugEnabled()) {
- // Don't know how to route stream, if it's correct, it's better to filter useless stream in spout side
- LOG.debug("Drop event {} as StreamPartition {} is not pointed to any router metadata {}", event, event.getPartition(), routeSpecMap);
- }
- this.drop(event);
- return;
- }
-
- if (routePartitionerMap.get(partition) == null) {
- LOG.error("Partitioner for " + routerSpecs.get(0) + " is null");
- synchronized (outputLock) {
- this.streamContext.counter().incr("fail_count");
- this.outputCollector.fail(event);
- }
- return;
- }
-
- StreamEvent newEvent = event.getEvent().copy();
-
- // Get handler for the partition
- List<StreamRoutePartitioner> queuePartitioners = routePartitionerMap.get(partition);
-
- synchronized (outputLock) {
- for (StreamRoutePartitioner queuePartitioner : queuePartitioners) {
- List<StreamRoute> streamRoutes = queuePartitioner.partition(newEvent);
- // it is possible that one event can be sent to multiple slots in one slotqueue if that is All grouping
- for (StreamRoute streamRoute : streamRoutes) {
- String targetStreamId = StreamIdConversion.generateStreamIdBetween(sourceId, streamRoute.getTargetComponentId());
- try {
- PartitionedEvent emittedEvent = new PartitionedEvent(newEvent, partition, streamRoute.getPartitionKey());
- // Route Target Stream id instead of component id
- if (LOG.isDebugEnabled()) {
- LOG.debug("Emitted to stream {} with message {}", targetStreamId, emittedEvent);
- }
- outputCollector.emit(targetStreamId, event);
- this.streamContext.counter().incr("emit_count");
- } catch (RuntimeException ex) {
- this.streamContext.counter().incr("fail_count");
- LOG.error("Failed to emit to {} with {}", targetStreamId, newEvent, ex);
- throw ex;
- }
- }
- }
- outputCollector.ack(event);
- }
- } catch (Exception ex) {
- LOG.error(ex.getMessage(), ex);
- synchronized (outputLock) {
- this.streamContext.counter().incr("fail_count");
- this.outputCollector.fail(event);
- }
- }
- }
-
- @Override
- public void onStreamRouterSpecChange(Collection<StreamRouterSpec> added,
- Collection<StreamRouterSpec> removed,
- Collection<StreamRouterSpec> modified,
- Map<String, StreamDefinition> sds) {
- Map<StreamPartition, List<StreamRouterSpec>> copyRouteSpecMap = new HashMap<>(routeSpecMap);
- Map<StreamPartition, List<StreamRoutePartitioner>> copyRoutePartitionerMap = new HashMap<>(routePartitionerMap);
-
- // added StreamRouterSpec i.e. there is a new StreamPartition
- for (StreamRouterSpec spec : added) {
- if (copyRouteSpecMap.containsKey(spec.getPartition())
- && copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
- LOG.error("Metadata calculation error: add existing StreamRouterSpec " + spec);
- } else {
- inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds);
- }
- }
-
- // removed StreamRouterSpec i.e. there is a deleted StreamPartition
- for (StreamRouterSpec spec : removed) {
- if (!copyRouteSpecMap.containsKey(spec.getPartition())
- || !copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
- LOG.error("Metadata calculation error: remove non-existing StreamRouterSpec " + spec);
- } else {
- inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
- }
- }
-
- // modified StreamRouterSpec, i.e. there is modified StreamPartition, for example WorkSlotQueue assignment is changed
- for (StreamRouterSpec spec : modified) {
- if (!copyRouteSpecMap.containsKey(spec.getPartition())
- || copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
- LOG.error("Metadata calculation error: modify nonexisting StreamRouterSpec " + spec);
- } else {
- inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
- inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds);
- }
- }
-
- // switch
- routeSpecMap = copyRouteSpecMap;
- routePartitionerMap = copyRoutePartitionerMap;
- }
-
- private void inplaceRemove(Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap,
- Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap,
- StreamRouterSpec toBeRemoved) {
- routeSpecMap.remove(toBeRemoved.getPartition());
- routePartitionerMap.remove(toBeRemoved.getPartition());
- }
-
- private void inplaceAdd(Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap,
- Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap,
- StreamRouterSpec toBeAdded, Map<String, StreamDefinition> sds) {
- if (!routeSpecMap.containsKey(toBeAdded.getPartition())) {
- routeSpecMap.put(toBeAdded.getPartition(), new ArrayList<StreamRouterSpec>());
- }
- routeSpecMap.get(toBeAdded.getPartition()).add(toBeAdded);
- try {
- List<StreamRoutePartitioner> routePartitioners = calculatePartitioner(toBeAdded, sds, routePartitionerMap);
- routePartitionerMap.put(toBeAdded.getPartition(), routePartitioners);
- } catch (Exception e) {
- LOG.error("ignore this failure StreamRouterSpec " + toBeAdded + ", with error" + e.getMessage(), e);
- routeSpecMap.remove(toBeAdded.getPartition());
- routePartitionerMap.remove(toBeAdded.getPartition());
- }
- }
-
- private List<StreamRoutePartitioner> calculatePartitioner(StreamRouterSpec streamRouterSpec,
- Map<String, StreamDefinition> sds,
- Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap) throws Exception {
- List<StreamRoutePartitioner> routePartitioners = routePartitionerMap.get(streamRouterSpec.getPartition());
- if (routePartitioners == null) {
- routePartitioners = new ArrayList<>();
- }
- for (PolicyWorkerQueue pwq : streamRouterSpec.getTargetQueue()) {
- routePartitioners.add(StreamRoutePartitionFactory.createRoutePartitioner(
- Lists.transform(pwq.getWorkers(), WorkSlot::getBoltId),
- sds.get(streamRouterSpec.getPartition().getStreamId()),
- streamRouterSpec.getPartition()));
- }
- return routePartitioners;
- }
-
- @Override
- public void drop(PartitionedEvent event) {
- synchronized (outputLock) {
- this.streamContext.counter().incr("drop_count");
- if (event.getAnchor() != null) {
- this.outputCollector.ack(event);
- } else {
- throw new IllegalStateException(event.toString() + " was not acked as anchor is null");
- }
- }
- }
-
- public void flush() {
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
deleted file mode 100644
index 41523cc..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router.impl;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.router.StreamRouter;
-import org.apache.eagle.alert.engine.router.StreamSortHandler;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClockManager;
-import org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl;
-import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockManagerImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public class StreamRouterImpl implements StreamRouter {
- private static final long serialVersionUID = -4640125063690900014L;
- private static final Logger LOG = LoggerFactory.getLogger(StreamRouterImpl.class);
- private final String name;
- private volatile Map<StreamPartition, StreamSortHandler> streamSortHandlers;
- private PartitionedEventCollector outputCollector;
- private StreamTimeClockManager streamTimeClockManager;
- private StreamContext context;
-
- /**
- * @param name This name should be formed by topologyId + router id, which is built by topology builder.
- */
- public StreamRouterImpl(String name) {
- this.name = name;
- }
-
- public String getName() {
- return this.name;
- }
-
- @Override
- public void close() {
- streamSortHandlers.values().forEach(StreamSortHandler::close);
- streamTimeClockManager.close();
- }
-
- public void prepare(StreamContext context, PartitionedEventCollector outputCollector) {
- this.streamTimeClockManager = new StreamTimeClockManagerImpl();
- this.streamSortHandlers = new HashMap<>();
- this.outputCollector = outputCollector;
- this.context = context;
- }
-
- /**
- * TODO: Potential improvement: if StreamSortHandler is expensive, we can use DISRUPTOR to buffer.
- *
- * @param event StreamEvent
- */
- public void nextEvent(PartitionedEvent event) {
- this.context.counter().incr("receive_count");
- if (!dispatchToSortHandler(event)) {
- this.context.counter().incr("direct_count");
- // Pass through directly if no need to sort
- outputCollector.emit(event);
- }
- this.context.counter().incr("sort_count");
- // Update stream clock time if moving forward and trigger all tick listeners
- streamTimeClockManager.onTimeUpdate(event.getStreamId(), event.getTimestamp());
- }
-
- /**
- * @param event input event.
- * @return whether sorted.
- */
- private boolean dispatchToSortHandler(PartitionedEvent event) {
- if (event.getTimestamp() <= 0) {
- return false;
- }
-
- StreamSortHandler sortHandler = streamSortHandlers.get(event.getPartition());
- if (sortHandler == null) {
- if (event.isSortRequired()) {
- LOG.warn("Stream sort handler required has not been loaded so emmit directly: {}", event);
- this.context.counter().incr("miss_sort_count");
- }
- return false;
- } else {
- sortHandler.nextEvent(event);
- return true;
- }
- }
-
- @Override
- public void onStreamSortSpecChange(Map<StreamPartition, StreamSortSpec> added,
- Map<StreamPartition, StreamSortSpec> removed,
- Map<StreamPartition, StreamSortSpec> changed) {
- synchronized (streamTimeClockManager) {
- Map<StreamPartition, StreamSortHandler> copy = new HashMap<>(this.streamSortHandlers);
- // add new StreamSortSpec
- if (added != null && added.size() > 0) {
- for (Entry<StreamPartition, StreamSortSpec> spec : added.entrySet()) {
- StreamPartition tmp = spec.getKey();
- if (copy.containsKey(tmp)) {
- LOG.error("Metadata calculation error: Duplicated StreamSortSpec " + spec);
- } else {
- StreamSortHandler handler = new StreamSortWindowHandlerImpl();
- handler.prepare(tmp.getStreamId(), spec.getValue(), this.outputCollector);
- copy.put(tmp, handler);
- streamTimeClockManager.registerListener(streamTimeClockManager.createStreamTimeClock(tmp.getStreamId()), handler);
- }
- }
- }
-
- // remove StreamSortSpec
- if (removed != null && removed.size() > 0) {
- for (Entry<StreamPartition, StreamSortSpec> spec : removed.entrySet()) {
- StreamPartition tmp = spec.getKey();
- if (copy.containsKey(tmp)) {
- copy.get(tmp).close();
- streamTimeClockManager.removeListener(copy.get(tmp));
- copy.remove(tmp);
- } else {
- LOG.error("Metadata calculation error: remove nonexisting StreamSortSpec " + spec.getValue());
- }
- }
- }
-
- // modify StreamSortSpec
- if (changed != null && changed.size() > 0) {
- for (Entry<StreamPartition, StreamSortSpec> spec : changed.entrySet()) {
- StreamPartition tmp = spec.getKey();
- if (copy.containsKey(tmp)) {
- copy.get(tmp).close();
- streamTimeClockManager.removeListener(copy.get(tmp));
- copy.remove(tmp);
- StreamSortHandler handler = new StreamSortWindowHandlerImpl();
- handler.prepare(tmp.getStreamId(), spec.getValue(), this.outputCollector);
- copy.put(tmp, handler);
- streamTimeClockManager.registerListener(tmp.getStreamId(), handler);
- } else {
- LOG.error("Metadata calculation error: modify non-existing StreamSortSpec " + spec.getValue());
- }
- }
- }
-
- // atomic switch
- this.streamSortHandlers = copy;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
deleted file mode 100644
index ab05b48..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
-import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.engine.serialization.Serializers;
-import org.apache.eagle.alert.utils.AlertConstants;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@SuppressWarnings( {"rawtypes", "serial"})
-public abstract class AbstractStreamBolt extends BaseRichBolt implements SerializationMetadataProvider {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamBolt.class);
- private IMetadataChangeNotifyService changeNotifyService;
-
- public Config getConfig() {
- return config;
- }
-
- private Config config;
- private List<String> outputStreamIds;
- protected OutputCollector collector;
- protected Map stormConf;
-
- private String boltId;
- protected PartitionedEventSerializer serializer;
- protected volatile Map<String, StreamDefinition> sdf = new HashMap<String, StreamDefinition>();
- protected volatile String specVersion = "Not Initialized";
- protected volatile boolean specVersionOutofdate = false;
- protected StreamContext streamContext;
-
- public AbstractStreamBolt(String boltId, IMetadataChangeNotifyService changeNotifyService, Config config) {
- this.boltId = boltId;
- this.changeNotifyService = changeNotifyService;
- this.config = config;
- }
-
- public void declareOutputStreams(List<String> outputStreamIds) {
- this.outputStreamIds = outputStreamIds;
- }
-
- protected List<String> getOutputStreamIds() {
- return this.outputStreamIds;
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- Preconditions.checkNotNull(this.changeNotifyService, "IMetadataChangeNotifyService is not set yet");
- this.stormConf = stormConf;
- this.collector = collector;
- this.serializer = Serializers.newPartitionedEventSerializer(this);
- internalPrepare(collector, this.changeNotifyService, this.config, context);
- try {
- this.changeNotifyService.activateFetchMetaData();
- } catch (Exception e) {
- LOG.warn(e.getMessage(), e);
- }
- }
-
-
- protected PartitionedEvent deserialize(Object object) throws IOException {
- // byte[] in higher priority
- if (object instanceof byte[]) {
- return serializer.deserialize((byte[]) object);
- } else if (object instanceof PartitionedEvent) {
- return (PartitionedEvent) object;
- } else {
- throw new IllegalStateException(String.format("Unsupported event class '%s', expect byte array or PartitionedEvent!", object == null ? null : object.getClass().getCanonicalName()));
- }
- }
-
- /**
- * subclass should implement more initialization for example.
- * 1) register metadata change
- * 2) init stream context
- *
- * @param collector
- * @param metadataManager
- * @param config
- * @param context
- */
- public abstract void internalPrepare(
- OutputCollector collector,
- IMetadataChangeNotifyService metadataManager,
- Config config, TopologyContext context);
-
- @Override
- public void cleanup() {
- super.cleanup();
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- if (this.outputStreamIds != null) {
- LOG.info("declare streams: {} ", outputStreamIds);
- for (String streamId : this.outputStreamIds) {
- declarer.declareStream(streamId, new Fields(AlertConstants.FIELD_0));
- }
- } else {
- declarer.declare(new Fields(AlertConstants.FIELD_0));
- }
- }
-
- @Override
- public StreamDefinition getStreamDefinition(String streamId) throws StreamNotDefinedException {
- if (sdf.containsKey(streamId)) {
- return sdf.get(streamId);
- } else {
- throw new StreamNotDefinedException(streamId, specVersion);
- }
- }
-
- public String getBoltId() {
- return boltId;
- }
-}
\ No newline at end of file