You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:31 UTC

[23/84] [partial] eagle git commit: Clean repo for eagle site

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
deleted file mode 100644
index 87a067f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.publisher.template;
-
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.engine.coordinator.AlertDefinition;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.common.DateTimeUtil;
-import org.apache.velocity.Template;
-import org.apache.velocity.VelocityContext;
-import org.apache.velocity.app.Velocity;
-import org.apache.velocity.app.VelocityEngine;
-import org.apache.velocity.runtime.RuntimeConstants;
-import org.apache.velocity.runtime.resource.loader.StringResourceLoader;
-import org.apache.velocity.runtime.resource.util.StringResourceRepository;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-public class VelocityAlertTemplateEngine implements AlertTemplateEngine {
-    private static final String ALERT_BODY_TPL_PREFIX = "AlertBodyTemplate";
-    private static final String ALERT_SUBJECT_TPL_PREFIX = "AlertSubjectTemplate";
-    private static final Logger LOG = LoggerFactory.getLogger(VelocityAlertTemplateEngine.class);
-    private StringResourceRepository stringResourceRepository;
-    private Map<String, PolicyDefinition> policyDefinitionRepository;
-    private VelocityEngine engine;
-
-
-    @Override
-    public void init(Config config) {
-        engine = new VelocityEngine();
-        engine.setProperty(RuntimeConstants.RUNTIME_LOG_LOGSYSTEM_CLASS, "org.apache.velocity.runtime.log.Log4JLogChute");
-        engine.setProperty("runtime.log.logsystem.log4j.logger", LOG.getName());
-        engine.setProperty(Velocity.RESOURCE_LOADER, "string");
-        engine.addProperty("string.resource.loader.class", StringResourceLoader.class.getName());
-        engine.addProperty("string.resource.loader.repository.static", "false");
-        engine.init();
-
-        stringResourceRepository = (StringResourceRepository) engine.getApplicationAttribute(StringResourceLoader.REPOSITORY_NAME_DEFAULT);
-        policyDefinitionRepository = new HashMap<>();
-    }
-
-    private String getAlertBodyTemplateName(String policyId) {
-        return String.format("%s:%s", ALERT_BODY_TPL_PREFIX, policyId);
-    }
-
-    private String getAlertSubjectTemplateName(String policyId) {
-        return String.format("%s:%s", ALERT_SUBJECT_TPL_PREFIX, policyId);
-    }
-
-    @Override
-    public synchronized void register(PolicyDefinition policyDefinition) {
-        LOG.info("Registering {}", policyDefinition.getName());
-        Preconditions.checkNotNull(policyDefinition.getName(), "policyId is null");
-        AlertDefinition alertDefinition = policyDefinition.getAlertDefinition();
-        if (alertDefinition == null) {
-            LOG.warn("Subject template of policy {} is null, using policy name by default");
-            stringResourceRepository.putStringResource(getAlertSubjectTemplateName(policyDefinition.getName()), policyDefinition.getName());
-
-            LOG.warn("Body template of policy {} is null, using $ALERT_EVENT by default");
-            String defaultAlertBodyTmpl = String.format("Message: $%s (Auto-generated alert message as template not defined in policy %s)",
-                AlertContextFields.ALERT_EVENT, policyDefinition.getName());
-            stringResourceRepository.putStringResource(getAlertBodyTemplateName(policyDefinition.getName()), defaultAlertBodyTmpl);
-        } else if (alertDefinition.getTemplateType().equals(AlertDefinition.TemplateType.TEXT)) {
-            if (alertDefinition.getSubject() != null) {
-                stringResourceRepository.putStringResource(getAlertSubjectTemplateName(policyDefinition.getName()), alertDefinition.getSubject());
-            } else {
-                LOG.warn("Subject template of policy {} is null, using policy name by default");
-                stringResourceRepository.putStringResource(getAlertSubjectTemplateName(policyDefinition.getName()), policyDefinition.getName());
-            }
-            if (alertDefinition.getBody() != null) {
-                stringResourceRepository.putStringResource(getAlertBodyTemplateName(policyDefinition.getName()), alertDefinition.getBody());
-            } else {
-                LOG.warn("Body template of policy {} is null, using ALERT_EVENT by default");
-                stringResourceRepository.putStringResource(getAlertBodyTemplateName(policyDefinition.getName()), "$" + AlertContextFields.ALERT_EVENT);
-            }
-        } else {
-            throw new IllegalArgumentException("Unsupported alert template type " + alertDefinition.getTemplateType());
-        }
-        policyDefinitionRepository.put(policyDefinition.getName(), policyDefinition);
-    }
-
-    @Override
-    public synchronized void unregister(String policyId) {
-        LOG.info("Unregistering {}", policyId);
-        stringResourceRepository.removeStringResource(getAlertBodyTemplateName(policyId));
-        stringResourceRepository.removeStringResource(getAlertSubjectTemplateName(policyId));
-        policyDefinitionRepository.remove(policyId);
-    }
-
-    @Override
-    public synchronized AlertStreamEvent filter(AlertStreamEvent event) {
-        Preconditions.checkArgument(this.policyDefinitionRepository.containsKey(event.getPolicyId()), "Unknown policyId " + event.getPolicyId());
-        PolicyDefinition policyDefinition = this.policyDefinitionRepository.get(event.getPolicyId());
-        StringWriter bodyWriter = new StringWriter();
-        StringWriter subjectWriter = new StringWriter();
-        try {
-            VelocityContext alertContext = buildAlertContext(policyDefinition, event);
-            Template template = engine.getTemplate(getAlertBodyTemplateName(event.getPolicyId()));
-            template.merge(alertContext, bodyWriter);
-            event.setBody(bodyWriter.toString());
-
-            template = engine.getTemplate(getAlertSubjectTemplateName(event.getPolicyId()));
-            template.merge(alertContext, subjectWriter);
-            event.setSubject(subjectWriter.toString());
-        } finally {
-            try {
-                bodyWriter.close();
-            } catch (IOException e) {
-                LOG.warn(e.getMessage(), e);
-            }
-            try {
-                subjectWriter.close();
-            } catch (IOException e) {
-                LOG.warn(e.getMessage(), e);
-            }
-        }
-        return event;
-    }
-
-    @Override
-    public synchronized Collection<PolicyDefinition> getPolicies() {
-        return policyDefinitionRepository.values();
-    }
-
-    private static VelocityContext buildAlertContext(PolicyDefinition policyDefinition, AlertStreamEvent event) {
-        VelocityContext context = new VelocityContext();
-        context.put(AlertContextFields.SITE_ID, event.getSiteId());
-        context.put(AlertContextFields.STREAM_ID, event.getStreamId());
-        context.put(AlertContextFields.ALERT_ID, event.getAlertId());
-        context.put(AlertContextFields.CREATED_BY, event.getCreatedBy());
-        context.put(AlertContextFields.CREATED_TIMESTAMP, event.getCreatedTime());
-        context.put(AlertContextFields.CREATED_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()));
-        context.put(AlertContextFields.ALERT_TIMESTAMP, event.getTimestamp());
-        context.put(AlertContextFields.ALERT_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getTimestamp()));
-        context.put(AlertContextFields.ALERT_SCHEMA, event.getSchema());
-        context.put(AlertContextFields.ALERT_EVENT, event);
-
-        context.put(AlertContextFields.POLICY_ID, policyDefinition.getName());
-        context.put(AlertContextFields.POLICY_DESC, policyDefinition.getDescription());
-        context.put(AlertContextFields.POLICY_TYPE, policyDefinition.getDefinition().getType());
-        context.put(AlertContextFields.POLICY_DEFINITION, policyDefinition.getDefinition().getValue());
-        context.put(AlertContextFields.POLICY_HANDLER, policyDefinition.getDefinition().getHandlerClass());
-
-        for (Map.Entry<String, Object> entry : event.getDataMap().entrySet()) {
-            context.put(entry.getKey(), entry.getValue());
-        }
-        return context;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java
deleted file mode 100644
index a824a0d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.publisher.template;
-
-import org.apache.velocity.Template;
-import org.apache.velocity.VelocityContext;
-import org.apache.velocity.app.Velocity;
-import org.apache.velocity.app.VelocityEngine;
-import org.apache.velocity.exception.MethodInvocationException;
-import org.apache.velocity.exception.ParseErrorException;
-import org.apache.velocity.runtime.RuntimeConstants;
-import org.apache.velocity.runtime.parser.node.ASTReference;
-import org.apache.velocity.runtime.parser.node.ASTprocess;
-import org.apache.velocity.runtime.resource.loader.StringResourceLoader;
-import org.apache.velocity.runtime.resource.util.StringResourceRepository;
-import org.apache.velocity.runtime.visitor.NodeViewMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class VelocityTemplateParser {
-    private static final Logger LOG = LoggerFactory.getLogger(VelocityTemplateParser.class);
-    private static final String TEMPLATE_NAME = "template";
-    private final Template template;
-    private final ParserNodeVisitor visitor;
-
-    public VelocityTemplateParser(String templateString) throws ParseErrorException {
-        VelocityEngine engine = new VelocityEngine();
-        engine.setProperty(RuntimeConstants.RUNTIME_LOG_LOGSYSTEM_CLASS, "org.apache.velocity.runtime.log.Log4JLogChute");
-        engine.setProperty("runtime.log.logsystem.log4j.logger", LOG.getName());
-        engine.setProperty(Velocity.RESOURCE_LOADER, "string");
-        engine.addProperty("string.resource.loader.class", StringResourceLoader.class.getName());
-        engine.addProperty("string.resource.loader.repository.static", "false");
-        engine.addProperty("runtime.references.strict", "true");
-        engine.init();
-        StringResourceRepository resourceRepository = (StringResourceRepository) engine.getApplicationAttribute(StringResourceLoader.REPOSITORY_NAME_DEFAULT);
-        resourceRepository.putStringResource(TEMPLATE_NAME, templateString);
-        template = engine.getTemplate(TEMPLATE_NAME);
-        ASTprocess data = (ASTprocess) template.getData();
-        visitor = new ParserNodeVisitor();
-        data.jjtAccept(visitor, null);
-    }
-
-    public List<String> getReferenceNames() {
-        return this.visitor.getReferenceNames();
-    }
-
-    public Template getTemplate() {
-        return template;
-    }
-
-    /**
-     * @throws MethodInvocationException if required variable is missing in context.
-     */
-    public void validateContext(Map<String, Object> context) throws MethodInvocationException {
-        VelocityContext velocityContext = new VelocityContext();
-        for (Map.Entry<String, Object> entry : context.entrySet()) {
-            velocityContext.put(entry.getKey(), entry.getValue());
-        }
-        template.merge(velocityContext, new StringWriter());
-    }
-
-    private class ParserNodeVisitor extends NodeViewMode {
-        private List<String> referenceNames = new ArrayList<>();
-
-        @Override
-        public Object visit(ASTReference node, Object data) {
-            referenceNames.add(node.getRootString());
-            return super.visit(node, data);
-        }
-
-        public List<String> getReferenceNames() {
-            return this.referenceNames;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
deleted file mode 100644
index e1f3e9c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
- * Since 5/2/16.
- */
-public interface AlertBoltSpecListener {
-    void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition> sds);
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
deleted file mode 100644
index 598ce18..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-import java.util.Map;
-
-/**
- * Since 5/3/16.
- */
-public interface SpoutSpecListener {
-    void onSpoutSpecChange(SpoutSpec spec, Map<String, StreamDefinition> sds);
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java
deleted file mode 100644
index 88ffadb..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-import java.util.List;
-
-
-public interface StreamOutputCollector {
-    void emit(String streamId, PartitionedEvent partitionedEvent) throws Exception;
-
-    void emit(List<Object> tuple);
-
-    void ack(PartitionedEvent partitionedEvent);
-
-    void fail(PartitionedEvent partitionedEvent);
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
deleted file mode 100644
index 049e852..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import java.io.Serializable;
-
-/**
- * <b></b>
- * 1. Group by SingleStream[stream_1.col1]
- *
- * <p>Shuffle(stream_1,[col1])</p>
- *
- * <b></b>
- * 2. Group by SingleStream[stream_1.col1,stream_1.col2]
- *
- * <p>Shuffle(stream_1,[col1,col2])</p>
- *
- * <b></b>
- * 3. Group by JoinedStream[stream_1.col1,stream_1.col2,stream_2.col3]
- * <p>Shuffle(stream_1.col1,stream_1.col2) + Global(stream_2.col3)</p>
- */
-public class StreamRoute implements Serializable {
-    private static final long serialVersionUID = 4649184902196034940L;
-
-    private String targetComponentId;
-    private int partitionKey;
-    private String partitionType;
-
-    public String getTargetComponentId() {
-        return targetComponentId;
-    }
-
-    public void setTargetComponentId(String targetComponentId) {
-        this.targetComponentId = targetComponentId;
-    }
-
-    public StreamRoute(String targetComponentId, int partitionKey, StreamPartition.Type type) {
-        this.setTargetComponentId(targetComponentId);
-        this.setPartitionKey(partitionKey);
-        this.setPartitionType(type);
-    }
-
-    public int getPartitionKey() {
-        return partitionKey;
-    }
-
-    public void setPartitionKey(int partitionKey) {
-        this.partitionKey = partitionKey;
-    }
-
-    public StreamPartition.Type getPartitionType() {
-        return StreamPartition.Type.valueOf(partitionType);
-    }
-
-    public void setPartitionType(StreamPartition.Type partitionType) {
-        this.partitionType = partitionType.name();
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder().append(partitionKey).append(partitionType).append(targetComponentId).build();
-    }
-
-    @Override
-    public String toString() {
-        return String.format("Route[target=%s, key=%s, type=%s]", this.targetComponentId, this.partitionKey, this.partitionType);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
deleted file mode 100644
index 0d397e4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.router.impl.BasicStreamRoutePartitioner;
-
-import java.util.List;
-
-public class StreamRoutePartitionFactory {
-    /**
-     * TODO: Decouple different StreamRoutePartitioner implementation from BasicStreamRoutePartitioner.
-     *
-     * @param outputComponentIds
-     * @param streamDefinition
-     * @param partition
-     * @return
-     */
-    public static StreamRoutePartitioner createRoutePartitioner(List<String> outputComponentIds, StreamDefinition streamDefinition, StreamPartition partition) {
-        return new BasicStreamRoutePartitioner(outputComponentIds, streamDefinition, partition);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
deleted file mode 100644
index 5b5632d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-import java.util.List;
-
-public interface StreamRoutePartitioner {
-    List<StreamRoute> partition(StreamEvent event);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
deleted file mode 100644
index dfd2cc4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-import java.util.Collection;
-import java.util.Map;
-
-public interface StreamRouteSpecListener {
-    void onStreamRouterSpecChange(Collection<StreamRouterSpec> added,
-                                  Collection<StreamRouterSpec> removed,
-                                  Collection<StreamRouterSpec> modified,
-                                  Map<String, StreamDefinition> sds);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
deleted file mode 100644
index a9efc97..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-import java.io.Serializable;
-
-public interface StreamRouter extends StreamSortSpecListener, Serializable {
-    void prepare(StreamContext context, PartitionedEventCollector outputCollector);
-
-    void nextEvent(PartitionedEvent event);
-
-    String getName();
-
-    void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
deleted file mode 100644
index 0016fc0..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-import java.util.Map;
-
-/**
- * Listen to change on StreamRouterBoltSpec.
- * @since 5/1/16.
- */
-public interface StreamRouterBoltSpecListener {
-    void onStreamRouteBoltSpecChange(RouterSpec spec, Map<String, StreamDefinition> sds);
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
deleted file mode 100644
index 613ab7f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClockListener;
-
-public interface StreamSortHandler extends StreamTimeClockListener {
-
-    void prepare(String streamId, StreamSortSpec streamSortSpecSpec, PartitionedEventCollector outputCollector);
-
-    /**
-     * @param event StreamEvent.
-     */
-    void nextEvent(PartitionedEvent event);
-
-    void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
deleted file mode 100644
index 087a46f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-
-import java.util.Map;
-
-public interface StreamSortSpecListener {
-    void onStreamSortSpecChange(Map<StreamPartition, StreamSortSpec> added,
-                                Map<StreamPartition, StreamSortSpec> removed,
-                                Map<StreamPartition, StreamSortSpec> changed);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
deleted file mode 100644
index a97e1da..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/BasicStreamRoutePartitioner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router.impl;
-
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.router.StreamRoute;
-import org.apache.eagle.alert.engine.router.StreamRoutePartitioner;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-public class BasicStreamRoutePartitioner implements StreamRoutePartitioner {
-    private final List<String> outputComponentIds;
-    private final StreamDefinition streamDefinition;
-    private final StreamPartition streamPartition;
-
-    public BasicStreamRoutePartitioner(List<String> outputComponentIds, StreamDefinition streamDefinition, StreamPartition partition) {
-        this.outputComponentIds = outputComponentIds;
-        this.streamDefinition = streamDefinition;
-        this.streamPartition = partition;
-    }
-
-    @Override
-    public List<StreamRoute> partition(StreamEvent event) {
-        switch (this.streamPartition.getType()) {
-            case GLOBAL:
-                return routeToAll(event);
-            case GROUPBY:
-                return routeByGroupByKey(event);
-            default:
-                return routeByShuffle(event);
-        }
-    }
-
-    protected List<StreamRoute> routeByGroupByKey(StreamEvent event) {
-        int partitionKey = new HashCodeBuilder().append(event.getData(streamDefinition, this.streamPartition.getColumns())).build();
-        String selectedOutputStream = outputComponentIds.get(Math.abs(partitionKey) % this.outputComponentIds.size());
-        return Collections.singletonList(new StreamRoute(selectedOutputStream, partitionKey, StreamPartition.Type.GROUPBY));
-    }
-
-    protected List<StreamRoute> routeByShuffle(StreamEvent event) {
-        long random = System.currentTimeMillis();
-        int hash = Math.abs((int) random);
-        return Arrays.asList(new StreamRoute(outputComponentIds.get(hash % outputComponentIds.size()), -1, StreamPartition.Type.SHUFFLE));
-    }
-
-    protected List<StreamRoute> routeToAll(StreamEvent event) {
-        if (globalRoutingKeys != null) {
-            globalRoutingKeys = new ArrayList<>();
-            for (String targetId : outputComponentIds) {
-                globalRoutingKeys.add(new StreamRoute(targetId, -1, StreamPartition.Type.GLOBAL));
-            }
-        }
-        return globalRoutingKeys;
-    }
-
-    private List<StreamRoute> globalRoutingKeys = null;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
deleted file mode 100644
index 5c10675..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/RoutePhysicalGrouping.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router.impl;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.trident.partition.GlobalGrouping;
-
-import java.util.*;
-
-public class RoutePhysicalGrouping implements CustomStreamGrouping {
-    private static final long serialVersionUID = -511915083994148362L;
-    private static final Logger LOG = LoggerFactory.getLogger(RoutePhysicalGrouping.class);
-    private List<Integer> outdegreeTasks;
-    private ShuffleGrouping shuffleGroupingDelegate;
-    private GlobalGrouping globalGroupingDelegate;
-    private Map<String, Integer> connectedTargetIds;
-
-    @Override
-    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
-        this.outdegreeTasks = new ArrayList<>(targetTasks);
-        shuffleGroupingDelegate = new ShuffleGrouping();
-        shuffleGroupingDelegate.prepare(context, stream, targetTasks);
-        globalGroupingDelegate = new GlobalGrouping();
-        globalGroupingDelegate.prepare(context, stream, targetTasks);
-        connectedTargetIds = new HashMap<>();
-        for (Integer targetId : targetTasks) {
-            String targetComponentId = context.getComponentId(targetId);
-            connectedTargetIds.put(targetComponentId, targetId);
-        }
-        LOG.info("OutDegree components: [{}]", StringUtils.join(connectedTargetIds.values(), ","));
-    }
-
-    @Override
-    public List<Integer> chooseTasks(int taskId, List<Object> values) {
-        Object routingKeyObj = values.get(0);
-        if (routingKeyObj != null) {
-            PartitionedEvent partitionedEvent = (PartitionedEvent) routingKeyObj;
-            if (partitionedEvent.getPartition().getType() == StreamPartition.Type.GLOBAL) {
-                return globalGroupingDelegate.chooseTasks(taskId, values);
-            } else if (partitionedEvent.getPartition().getType() == StreamPartition.Type.GROUPBY) {
-                return Collections.singletonList(outdegreeTasks.get((int) (partitionedEvent.getPartitionKey() % this.outdegreeTasks.size())));
-            }
-            // Shuffle by defaults
-            return shuffleGroupingDelegate.chooseTasks(taskId, values);
-        }
-
-        LOG.warn("Illegal null StreamRoute, throw event");
-        return Collections.emptyList();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
deleted file mode 100644
index 752c742..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/ShuffleGrouping.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router.impl;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * NOTE: This is copy from storm 1.0.0 code. DON'T modify it.
- *
- * @since May 4, 2016
- */
-public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
-    private static final long serialVersionUID = 5035497345182141765L;
-    private Random random;
-    private ArrayList<List<Integer>> choices;
-    private AtomicInteger current;
-
-    @Override
-    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
-        random = new Random();
-        choices = new ArrayList<List<Integer>>(targetTasks.size());
-        for (Integer i : targetTasks) {
-            choices.add(Arrays.asList(i));
-        }
-        Collections.shuffle(choices, random);
-        current = new AtomicInteger(0);
-    }
-
-    @Override
-    public List<Integer> chooseTasks(int taskId, List<Object> values) {
-        int rightNow;
-        int size = choices.size();
-        while (true) {
-            rightNow = current.incrementAndGet();
-            if (rightNow < size) {
-                return choices.get(rightNow);
-            } else if (rightNow == size) {
-                current.set(0);
-                //This should be thread safe so long as ArrayList does not have any internal state that can be messed up by multi-treaded access.
-                Collections.shuffle(choices, random);
-                return choices.get(0);
-            }
-            //race condition with another thread, and we lost
-            // try again
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java
deleted file mode 100644
index 7b8f344..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.engine.router.impl;
-
-import backtype.storm.task.OutputCollector;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.router.StreamOutputCollector;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
-
-import java.util.Collections;
-import java.util.List;
-
-public class StormOutputCollector implements StreamOutputCollector {
-
-    private final OutputCollector outputCollector;
-    private final PartitionedEventSerializer serializer;
-
-    public StormOutputCollector(OutputCollector outputCollector, PartitionedEventSerializer serializer) {
-        this.outputCollector = outputCollector;
-        this.serializer = serializer;
-    }
-
-    public StormOutputCollector(OutputCollector outputCollector) {
-        this(outputCollector, null);
-    }
-
-    @Override
-    public void emit(String streamId, PartitionedEvent partitionedEvent) throws Exception {
-        if (this.serializer == null) {
-            outputCollector.emit(streamId, Collections.singletonList(partitionedEvent.getAnchor()), Collections.singletonList(partitionedEvent));
-        } else {
-            outputCollector.emit(streamId, Collections.singletonList(partitionedEvent.getAnchor()), Collections.singletonList(serializer.serialize(partitionedEvent)));
-        }
-    }
-
-    @Override
-    public void emit(List<Object> tuple) {
-        outputCollector.emit(tuple);
-    }
-
-    @Override
-    public void ack(PartitionedEvent partitionedEvent) {
-        outputCollector.ack(partitionedEvent.getAnchor());
-    }
-
-    @Override
-    public void fail(PartitionedEvent partitionedEvent) {
-        this.outputCollector.fail(partitionedEvent.getAnchor());
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
deleted file mode 100644
index 2eb101a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.router.impl;
-
-import com.google.common.collect.Lists;
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.router.*;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * After sorting, one stream's message will be routed based on its StreamPartition
- * One stream may have multiple StreamPartitions based on how this stream is grouped by.
- * TODO: Add metric statistics
- */
-public class StreamRouterBoltOutputCollector implements PartitionedEventCollector, StreamRouteSpecListener {
-    private static final Logger LOG = LoggerFactory.getLogger(StreamRouterBoltOutputCollector.class);
-    private final StreamOutputCollector outputCollector;
-    private final Object outputLock = new Object();
-    //    private final List<String> outputStreamIds;
-    private final StreamContext streamContext;
-    private volatile Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap;
-    private volatile Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap;
-    private final String sourceId;
-
-    public StreamRouterBoltOutputCollector(String sourceId, StreamOutputCollector outputCollector, List<String> outputStreamIds, StreamContext streamContext) {
-        this.sourceId = sourceId;
-        this.outputCollector = outputCollector;
-        this.routeSpecMap = new HashMap<>();
-        this.routePartitionerMap = new HashMap<>();
-        // this.outputStreamIds = outputStreamIds;
-        this.streamContext = streamContext;
-    }
-
-    public void emit(PartitionedEvent event) {
-        try {
-            this.streamContext.counter().incr("send_count");
-            StreamPartition partition = event.getPartition();
-            List<StreamRouterSpec> routerSpecs = routeSpecMap.get(partition);
-            if (routerSpecs == null || routerSpecs.size() <= 0) {
-                if (LOG.isDebugEnabled()) {
-                    // Don't know how to route stream, if it's correct, it's better to filter useless stream in spout side
-                    LOG.debug("Drop event {} as StreamPartition {} is not pointed to any router metadata {}", event, event.getPartition(), routeSpecMap);
-                }
-                this.drop(event);
-                return;
-            }
-
-            if (routePartitionerMap.get(partition) == null) {
-                LOG.error("Partitioner for " + routerSpecs.get(0) + " is null");
-                synchronized (outputLock) {
-                    this.streamContext.counter().incr("fail_count");
-                    this.outputCollector.fail(event);
-                }
-                return;
-            }
-
-            StreamEvent newEvent = event.getEvent().copy();
-
-            // Get handler for the partition
-            List<StreamRoutePartitioner> queuePartitioners = routePartitionerMap.get(partition);
-
-            synchronized (outputLock) {
-                for (StreamRoutePartitioner queuePartitioner : queuePartitioners) {
-                    List<StreamRoute> streamRoutes = queuePartitioner.partition(newEvent);
-                    // it is possible that one event can be sent to multiple slots in one slotqueue if that is All grouping
-                    for (StreamRoute streamRoute : streamRoutes) {
-                        String targetStreamId = StreamIdConversion.generateStreamIdBetween(sourceId, streamRoute.getTargetComponentId());
-                        try {
-                            PartitionedEvent emittedEvent = new PartitionedEvent(newEvent, partition, streamRoute.getPartitionKey());
-                            // Route Target Stream id instead of component id
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Emitted to stream {} with message {}", targetStreamId, emittedEvent);
-                            }
-                            outputCollector.emit(targetStreamId, event);
-                            this.streamContext.counter().incr("emit_count");
-                        } catch (RuntimeException ex) {
-                            this.streamContext.counter().incr("fail_count");
-                            LOG.error("Failed to emit to {} with {}", targetStreamId, newEvent, ex);
-                            throw ex;
-                        }
-                    }
-                }
-                outputCollector.ack(event);
-            }
-        } catch (Exception ex) {
-            LOG.error(ex.getMessage(), ex);
-            synchronized (outputLock) {
-                this.streamContext.counter().incr("fail_count");
-                this.outputCollector.fail(event);
-            }
-        }
-    }
-
-    @Override
-    public void onStreamRouterSpecChange(Collection<StreamRouterSpec> added,
-                                         Collection<StreamRouterSpec> removed,
-                                         Collection<StreamRouterSpec> modified,
-                                         Map<String, StreamDefinition> sds) {
-        Map<StreamPartition, List<StreamRouterSpec>> copyRouteSpecMap = new HashMap<>(routeSpecMap);
-        Map<StreamPartition, List<StreamRoutePartitioner>> copyRoutePartitionerMap = new HashMap<>(routePartitionerMap);
-
-        // added StreamRouterSpec i.e. there is a new StreamPartition
-        for (StreamRouterSpec spec : added) {
-            if (copyRouteSpecMap.containsKey(spec.getPartition())
-                    && copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
-                LOG.error("Metadata calculation error: add existing StreamRouterSpec " + spec);
-            } else {
-                inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds);
-            }
-        }
-
-        // removed StreamRouterSpec i.e. there is a deleted StreamPartition
-        for (StreamRouterSpec spec : removed) {
-            if (!copyRouteSpecMap.containsKey(spec.getPartition())
-                    || !copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
-                LOG.error("Metadata calculation error: remove non-existing StreamRouterSpec " + spec);
-            } else {
-                inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
-            }
-        }
-
-        // modified StreamRouterSpec, i.e. there is modified StreamPartition, for example WorkSlotQueue assignment is changed
-        for (StreamRouterSpec spec : modified) {
-            if (!copyRouteSpecMap.containsKey(spec.getPartition())
-                    || copyRouteSpecMap.get(spec.getPartition()).contains(spec)) {
-                LOG.error("Metadata calculation error: modify nonexisting StreamRouterSpec " + spec);
-            } else {
-                inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec);
-                inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds);
-            }
-        }
-
-        // switch
-        routeSpecMap = copyRouteSpecMap;
-        routePartitionerMap = copyRoutePartitionerMap;
-    }
-
-    private void inplaceRemove(Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap,
-                               Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap,
-                               StreamRouterSpec toBeRemoved) {
-        routeSpecMap.remove(toBeRemoved.getPartition());
-        routePartitionerMap.remove(toBeRemoved.getPartition());
-    }
-
-    private void inplaceAdd(Map<StreamPartition, List<StreamRouterSpec>> routeSpecMap,
-                            Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap,
-                            StreamRouterSpec toBeAdded, Map<String, StreamDefinition> sds) {
-        if (!routeSpecMap.containsKey(toBeAdded.getPartition())) {
-            routeSpecMap.put(toBeAdded.getPartition(), new ArrayList<StreamRouterSpec>());
-        }
-        routeSpecMap.get(toBeAdded.getPartition()).add(toBeAdded);
-        try {
-            List<StreamRoutePartitioner> routePartitioners = calculatePartitioner(toBeAdded, sds, routePartitionerMap);
-            routePartitionerMap.put(toBeAdded.getPartition(), routePartitioners);
-        } catch (Exception e) {
-            LOG.error("ignore this failure StreamRouterSpec " + toBeAdded + ", with error" + e.getMessage(), e);
-            routeSpecMap.remove(toBeAdded.getPartition());
-            routePartitionerMap.remove(toBeAdded.getPartition());
-        }
-    }
-
-    private List<StreamRoutePartitioner> calculatePartitioner(StreamRouterSpec streamRouterSpec,
-                                                              Map<String, StreamDefinition> sds,
-                                                              Map<StreamPartition, List<StreamRoutePartitioner>> routePartitionerMap) throws Exception {
-        List<StreamRoutePartitioner> routePartitioners = routePartitionerMap.get(streamRouterSpec.getPartition());
-        if (routePartitioners == null) {
-            routePartitioners = new ArrayList<>();
-        }
-        for (PolicyWorkerQueue pwq : streamRouterSpec.getTargetQueue()) {
-            routePartitioners.add(StreamRoutePartitionFactory.createRoutePartitioner(
-                    Lists.transform(pwq.getWorkers(), WorkSlot::getBoltId),
-                    sds.get(streamRouterSpec.getPartition().getStreamId()),
-                    streamRouterSpec.getPartition()));
-        }
-        return routePartitioners;
-    }
-
-    @Override
-    public void drop(PartitionedEvent event) {
-        synchronized (outputLock) {
-            this.streamContext.counter().incr("drop_count");
-            if (event.getAnchor() != null) {
-                this.outputCollector.ack(event);
-            } else {
-                throw new IllegalStateException(event.toString() + " was not acked as anchor is null");
-            }
-        }
-    }
-
-    public void flush() {
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
deleted file mode 100644
index 41523cc..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.router.impl;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.router.StreamRouter;
-import org.apache.eagle.alert.engine.router.StreamSortHandler;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClockManager;
-import org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl;
-import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockManagerImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public class StreamRouterImpl implements StreamRouter {
-    private static final long serialVersionUID = -4640125063690900014L;
-    private static final Logger LOG = LoggerFactory.getLogger(StreamRouterImpl.class);
-    private final String name;
-    private volatile Map<StreamPartition, StreamSortHandler> streamSortHandlers;
-    private PartitionedEventCollector outputCollector;
-    private StreamTimeClockManager streamTimeClockManager;
-    private StreamContext context;
-
-    /**
-     * @param name This name should be formed by topologyId + router id, which is built by topology builder.
-     */
-    public StreamRouterImpl(String name) {
-        this.name = name;
-    }
-
-    public String getName() {
-        return this.name;
-    }
-
-    @Override
-    public void close() {
-        streamSortHandlers.values().forEach(StreamSortHandler::close);
-        streamTimeClockManager.close();
-    }
-
-    public void prepare(StreamContext context, PartitionedEventCollector outputCollector) {
-        this.streamTimeClockManager = new StreamTimeClockManagerImpl();
-        this.streamSortHandlers = new HashMap<>();
-        this.outputCollector = outputCollector;
-        this.context = context;
-    }
-
-    /**
-     * TODO: Potential improvement: if StreamSortHandler is expensive, we can use DISRUPTOR to buffer.
-     *
-     * @param event StreamEvent
-     */
-    public void nextEvent(PartitionedEvent event) {
-        this.context.counter().incr("receive_count");
-        if (!dispatchToSortHandler(event)) {
-            this.context.counter().incr("direct_count");
-            // Pass through directly if no need to sort
-            outputCollector.emit(event);
-        }
-        this.context.counter().incr("sort_count");
-        // Update stream clock time if moving forward and trigger all tick listeners
-        streamTimeClockManager.onTimeUpdate(event.getStreamId(), event.getTimestamp());
-    }
-
-    /**
-     * @param event input event.
-     * @return whether sorted.
-     */
-    private boolean dispatchToSortHandler(PartitionedEvent event) {
-        if (event.getTimestamp() <= 0) {
-            return false;
-        }
-
-        StreamSortHandler sortHandler = streamSortHandlers.get(event.getPartition());
-        if (sortHandler == null) {
-            if (event.isSortRequired()) {
-                LOG.warn("Stream sort handler required has not been loaded so emmit directly: {}", event);
-                this.context.counter().incr("miss_sort_count");
-            }
-            return false;
-        } else {
-            sortHandler.nextEvent(event);
-            return true;
-        }
-    }
-
-    @Override
-    public void onStreamSortSpecChange(Map<StreamPartition, StreamSortSpec> added,
-                                       Map<StreamPartition, StreamSortSpec> removed,
-                                       Map<StreamPartition, StreamSortSpec> changed) {
-        synchronized (streamTimeClockManager) {
-            Map<StreamPartition, StreamSortHandler> copy = new HashMap<>(this.streamSortHandlers);
-            // add new StreamSortSpec
-            if (added != null && added.size() > 0) {
-                for (Entry<StreamPartition, StreamSortSpec> spec : added.entrySet()) {
-                    StreamPartition tmp = spec.getKey();
-                    if (copy.containsKey(tmp)) {
-                        LOG.error("Metadata calculation error: Duplicated StreamSortSpec " + spec);
-                    } else {
-                        StreamSortHandler handler = new StreamSortWindowHandlerImpl();
-                        handler.prepare(tmp.getStreamId(), spec.getValue(), this.outputCollector);
-                        copy.put(tmp, handler);
-                        streamTimeClockManager.registerListener(streamTimeClockManager.createStreamTimeClock(tmp.getStreamId()), handler);
-                    }
-                }
-            }
-
-            // remove StreamSortSpec
-            if (removed != null && removed.size() > 0) {
-                for (Entry<StreamPartition, StreamSortSpec> spec : removed.entrySet()) {
-                    StreamPartition tmp = spec.getKey();
-                    if (copy.containsKey(tmp)) {
-                        copy.get(tmp).close();
-                        streamTimeClockManager.removeListener(copy.get(tmp));
-                        copy.remove(tmp);
-                    } else {
-                        LOG.error("Metadata calculation error: remove nonexisting StreamSortSpec " + spec.getValue());
-                    }
-                }
-            }
-
-            // modify StreamSortSpec
-            if (changed != null && changed.size() > 0) {
-                for (Entry<StreamPartition, StreamSortSpec> spec : changed.entrySet()) {
-                    StreamPartition tmp = spec.getKey();
-                    if (copy.containsKey(tmp)) {
-                        copy.get(tmp).close();
-                        streamTimeClockManager.removeListener(copy.get(tmp));
-                        copy.remove(tmp);
-                        StreamSortHandler handler = new StreamSortWindowHandlerImpl();
-                        handler.prepare(tmp.getStreamId(), spec.getValue(), this.outputCollector);
-                        copy.put(tmp, handler);
-                        streamTimeClockManager.registerListener(tmp.getStreamId(), handler);
-                    } else {
-                        LOG.error("Metadata calculation error: modify non-existing StreamSortSpec " + spec.getValue());
-                    }
-                }
-            }
-
-            // atomic switch
-            this.streamSortHandlers = copy;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
deleted file mode 100644
index ab05b48..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.runner;
-
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
-import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.engine.serialization.Serializers;
-import org.apache.eagle.alert.utils.AlertConstants;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@SuppressWarnings( {"rawtypes", "serial"})
-public abstract class AbstractStreamBolt extends BaseRichBolt implements SerializationMetadataProvider {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamBolt.class);
-    private IMetadataChangeNotifyService changeNotifyService;
-
-    public Config getConfig() {
-        return config;
-    }
-
-    private Config config;
-    private List<String> outputStreamIds;
-    protected OutputCollector collector;
-    protected Map stormConf;
-
-    private String boltId;
-    protected PartitionedEventSerializer serializer;
-    protected volatile Map<String, StreamDefinition> sdf = new HashMap<String, StreamDefinition>();
-    protected volatile String specVersion = "Not Initialized";
-    protected volatile boolean specVersionOutofdate = false;
-    protected StreamContext streamContext;
-
-    public AbstractStreamBolt(String boltId, IMetadataChangeNotifyService changeNotifyService, Config config) {
-        this.boltId = boltId;
-        this.changeNotifyService = changeNotifyService;
-        this.config = config;
-    }
-
-    public void declareOutputStreams(List<String> outputStreamIds) {
-        this.outputStreamIds = outputStreamIds;
-    }
-
-    protected List<String> getOutputStreamIds() {
-        return this.outputStreamIds;
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        Preconditions.checkNotNull(this.changeNotifyService, "IMetadataChangeNotifyService is not set yet");
-        this.stormConf = stormConf;
-        this.collector = collector;
-        this.serializer = Serializers.newPartitionedEventSerializer(this);
-        internalPrepare(collector, this.changeNotifyService, this.config, context);
-        try {
-            this.changeNotifyService.activateFetchMetaData();
-        } catch (Exception e) {
-            LOG.warn(e.getMessage(), e);
-        }
-    }
-
-
-    protected PartitionedEvent deserialize(Object object) throws IOException {
-        // byte[] in higher priority
-        if (object instanceof byte[]) {
-            return serializer.deserialize((byte[]) object);
-        } else if (object instanceof PartitionedEvent) {
-            return (PartitionedEvent) object;
-        } else {
-            throw new IllegalStateException(String.format("Unsupported event class '%s', expect byte array or PartitionedEvent!", object == null ? null : object.getClass().getCanonicalName()));
-        }
-    }
-
-    /**
-     * subclass should implement more initialization for example.
-     * 1) register metadata change
-     * 2) init stream context
-     *
-     * @param collector
-     * @param metadataManager
-     * @param config
-     * @param context
-     */
-    public abstract void internalPrepare(
-        OutputCollector collector,
-        IMetadataChangeNotifyService metadataManager,
-        Config config, TopologyContext context);
-
-    @Override
-    public void cleanup() {
-        super.cleanup();
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        if (this.outputStreamIds != null) {
-            LOG.info("declare streams: {} ", outputStreamIds);
-            for (String streamId : this.outputStreamIds) {
-                declarer.declareStream(streamId, new Fields(AlertConstants.FIELD_0));
-            }
-        } else {
-            declarer.declare(new Fields(AlertConstants.FIELD_0));
-        }
-    }
-
-    @Override
-    public StreamDefinition getStreamDefinition(String streamId) throws StreamNotDefinedException {
-        if (sdf.containsKey(streamId)) {
-            return sdf.get(streamId);
-        } else {
-            throw new StreamNotDefinedException(streamId, specVersion);
-        }
-    }
-
-    public String getBoltId() {
-        return boltId;
-    }
-}
\ No newline at end of file