You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:46 UTC

[38/84] [partial] eagle git commit: Clean repo for eagle site

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
deleted file mode 100644
index c377e41..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.ListUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.hibernate.validator.constraints.Length;
-
-import java.io.Serializable;
-import java.util.*;
-
-/**
- * @since Apr 5, 2016.
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class PolicyDefinition implements Serializable {
-    private static final long serialVersionUID = 377581499339572414L;
-    // unique identifier
-    @Length(min = 1, max = 50, message = "length should between 1 and 50")
-    private String name;
-    private String description;
-    private List<String> inputStreams = new ArrayList<>();
-    private List<String> outputStreams = new ArrayList<>();
-    private String siteId = "default";
-
-    private Definition definition;
-    private Definition stateDefinition;
-    private PolicyStatus policyStatus = PolicyStatus.ENABLED;
-    private AlertDefinition alertDefinition;
-
-    // one stream only have one partition in one policy, since we don't support stream alias
-    private List<StreamPartition> partitionSpec = new ArrayList<StreamPartition>();
-    private boolean dedicated;
-
-    // runtime configuration for policy, these are user-invisible
-    private int parallelismHint = 1;
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    public List<String> getInputStreams() {
-        return inputStreams;
-    }
-
-    public void setInputStreams(List<String> inputStreams) {
-        this.inputStreams = inputStreams;
-    }
-
-    public List<String> getOutputStreams() {
-        return outputStreams;
-    }
-
-    public void setOutputStreams(List<String> outputStreams) {
-        this.outputStreams = outputStreams;
-    }
-
-    public Definition getDefinition() {
-        return definition;
-    }
-
-    public Definition getStateDefinition() {
-        return stateDefinition;
-    }
-
-    public void setStateDefinition(Definition stateDefinition) {
-        this.stateDefinition = stateDefinition;
-    }
-
-    public void setDefinition(Definition definition) {
-        this.definition = definition;
-    }
-
-    public List<StreamPartition> getPartitionSpec() {
-        return partitionSpec;
-    }
-
-    public void setPartitionSpec(List<StreamPartition> partitionSpec) {
-        this.partitionSpec = partitionSpec;
-    }
-
-    public void addPartition(StreamPartition par) {
-        this.partitionSpec.add(par);
-    }
-
-    public boolean isDedicated() {
-        return dedicated;
-    }
-
-    public void setDedicated(boolean dedicated) {
-        this.dedicated = dedicated;
-    }
-
-    public int getParallelismHint() {
-        return parallelismHint;
-    }
-
-    public void setParallelismHint(int parallelism) {
-        this.parallelismHint = parallelism;
-    }
-
-    public PolicyStatus getPolicyStatus() {
-        return policyStatus;
-    }
-
-    public void setPolicyStatus(PolicyStatus policyStatus) {
-        this.policyStatus = policyStatus;
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder()
-                .append(siteId)
-                .append(name)
-                .append(inputStreams)
-                .append(outputStreams)
-                .append(definition)
-                .append(partitionSpec)
-                .append(policyStatus)
-                .append(parallelismHint)
-                .append(alertDefinition)
-                .build();
-    }
-
-    @Override
-    public boolean equals(Object that) {
-        if (that == this) {
-            return true;
-        }
-
-        if (!(that instanceof PolicyDefinition)) {
-            return false;
-        }
-
-        PolicyDefinition another = (PolicyDefinition) that;
-
-        if (Objects.equals(another.siteId, this.siteId)
-                && Objects.equals(another.name, this.name)
-                && Objects.equals(another.description, this.description)
-                && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams)
-                && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams)
-                && (another.definition != null && another.definition.equals(this.definition))
-                && Objects.equals(this.definition, another.definition)
-                && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
-                && another.policyStatus.equals(this.policyStatus)
-                && another.parallelismHint == this.parallelismHint
-                && Objects.equals(another.alertDefinition, alertDefinition)) {
-            return true;
-        }
-        return false;
-    }
-
-    public AlertDefinition getAlertDefinition() {
-        return alertDefinition;
-    }
-
-    public void setAlertDefinition(AlertDefinition alertDefinition) {
-        this.alertDefinition = alertDefinition;
-    }
-
-    public AlertSeverity getAlertSeverity() {
-        return alertDefinition == null ? null : alertDefinition.getSeverity();
-    }
-
-    public String getAlertCategory() {
-        return alertDefinition == null ? null : alertDefinition.getCategory();
-    }
-
-    public String getSiteId() {
-        return siteId;
-    }
-
-    public void setSiteId(String siteId) {
-        this.siteId = siteId;
-    }
-
-    @JsonIgnoreProperties(ignoreUnknown = true)
-    public static class Definition implements Serializable {
-        private static final long serialVersionUID = -622366527887848346L;
-
-        public String type;
-        public String value;
-        public String handlerClass;
-        public Map<String, Object> properties = new HashMap<>();
-
-        private List<String> inputStreams = new ArrayList<String>();
-        private List<String> outputStreams = new ArrayList<String>();
-
-        public Definition(String type, String value) {
-            this.type = type;
-            this.value = value;
-        }
-
-        public Definition() {
-            this.type = null;
-            this.value = null;
-        }
-
-        @Override
-        public int hashCode() {
-            return new HashCodeBuilder().append(type).append(value).build();
-        }
-
-        @Override
-        public boolean equals(Object that) {
-            if (that == this) {
-                return true;
-            }
-            if (!(that instanceof Definition)) {
-                return false;
-            }
-            Definition another = (Definition) that;
-            if (another.type.equals(this.type)
-                    && another.value.equals(this.value)
-                    && ListUtils.isEqualList(another.inputStreams, this.inputStreams)
-                    && ListUtils.isEqualList(another.outputStreams, this.outputStreams)) {
-                return true;
-            }
-            return false;
-        }
-
-        public String getType() {
-            return type;
-        }
-
-        public void setType(String type) {
-            this.type = type;
-        }
-
-        public String getValue() {
-            return value;
-        }
-
-        public void setValue(String value) {
-            this.value = value;
-        }
-
-        public void setInputStreams(List<String> inputStreams) {
-            this.inputStreams = inputStreams;
-        }
-
-        public void setOutputStreams(List<String> outputStreams) {
-            this.outputStreams = outputStreams;
-        }
-
-        public List<String> getInputStreams() {
-            return inputStreams;
-        }
-
-        public List<String> getOutputStreams() {
-            return outputStreams;
-        }
-
-        public String getHandlerClass() {
-            return handlerClass;
-        }
-
-        public void setHandlerClass(String handlerClass) {
-            this.handlerClass = handlerClass;
-        }
-
-        public Map<String, Object> getProperties() {
-            return properties;
-        }
-
-        public void setProperties(Map<String, Object> properties) {
-            this.properties = properties;
-        }
-
-        @Override
-        public String toString() {
-            return String.format("{type=\"%s\",value=\"%s\", inputStreams=\"%s\", outputStreams=\"%s\" }", type, value, inputStreams, outputStreams);
-        }
-    }
-
-    public static enum PolicyStatus {
-        ENABLED, DISABLED
-    }
-
-    @Override
-    public String toString() {
-        return String.format("{site=\"%s\", name=\"%s\",definition=%s}", this.getSiteId(), this.getName(), this.getDefinition() == null ? "null" : this.getDefinition().toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java
deleted file mode 100644
index 7e57f88..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishPartition.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.base.Objects;
-
-public class PublishPartition implements Serializable {
-
-    private static final long serialVersionUID = 2524776632955586234L;
-
-    private String policyId;
-    private String streamId;
-    private String publishId;
-    private Set<String> columns = new HashSet<>();
-
-    @JsonIgnore
-    private Set<Object> columnValues = new HashSet<>();
-
-    public PublishPartition() {
-    }
-
-    public PublishPartition(String streamId, String policyId, String publishId, Set<String> columns) {
-        this.streamId = streamId;
-        this.policyId = policyId;
-        this.publishId = publishId;
-        if (columns != null) {
-            this.columns = columns;
-        }
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder().append(streamId).append(policyId).append(publishId).append(columns).append(columnValues).build();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        return obj instanceof PublishPartition
-            && Objects.equal(this.streamId, ((PublishPartition) obj).getStreamId())
-            && Objects.equal(this.policyId, ((PublishPartition) obj).getPolicyId())
-            && Objects.equal(this.publishId, ((PublishPartition) obj).getPublishId())
-            && CollectionUtils.isEqualCollection(this.columns, ((PublishPartition) obj).getColumns())
-            && CollectionUtils.isEqualCollection(this.columnValues, ((PublishPartition) obj).getColumnValues());
-    }
-
-    @Override
-    public String toString() {
-        return String.format("PublishPartition[policyId=%s,streamId=%s,publishId=%s,columns=%s,columnValues=%s]",
-            policyId, streamId, publishId, columns, columnValues);
-    }
-
-    @Override
-    public PublishPartition clone() {
-        return new PublishPartition(this.streamId, this.policyId, this.publishId, new HashSet<>(this.columns));
-    }
-
-    public String getPolicyId() {
-        return policyId;
-    }
-
-    public void setPolicyId(String policyId) {
-        this.policyId = policyId;
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    public void setStreamId(String streamId) {
-        this.streamId = streamId;
-    }
-
-    public String getPublishId() {
-        return publishId;
-    }
-
-    public void setPublishId(String publishId) {
-        this.publishId = publishId;
-    }
-
-    public Set<String> getColumns() {
-        return columns;
-    }
-
-    public void setColumns(Set<String> columns) {
-        this.columns = columns;
-    }
-
-    @JsonIgnore
-    public Set<Object> getColumnValues() {
-        return columnValues;
-    }
-
-    @JsonIgnore
-    public void setColumnValues(Set<Object> columnValues) {
-        this.columnValues = columnValues;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
deleted file mode 100644
index 74a3d69..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * @since Apr 11, 2016.
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class Publishment {
-
-    public static final String STREAM_NAME_DEFAULT = "_default";
-
-    private String name;
-    private String type;
-    private List<String> policyIds;
-    private List<String> streamIds;
-    private String dedupIntervalMin;
-    private List<String> dedupFields;
-    private String dedupStateField;
-    private String dedupStateCloseValue;
-    private OverrideDeduplicatorSpec overrideDeduplicator;
-    private Map<String, Object> properties;
-    // the class name to extend the IEventSerializer interface
-    private String serializer;
-
-    private Set<String> partitionColumns = new HashSet<>();
-
-    public Set<String> getPartitionColumns() {
-        return partitionColumns;
-    }
-
-    public void setPartitionColumns(Set<String> partitionColumns) {
-        this.partitionColumns = partitionColumns;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getDedupStateField() {
-        return dedupStateField;
-    }
-
-    public void setDedupStateField(String dedupStateField) {
-        this.dedupStateField = dedupStateField;
-    }
-
-    public String getDedupStateCloseValue() {
-        return dedupStateCloseValue;
-    }
-
-    public void setDedupStateCloseValue(String dedupStateCloseValue) {
-        this.dedupStateCloseValue = dedupStateCloseValue;
-    }
-
-    public OverrideDeduplicatorSpec getOverrideDeduplicator() {
-        return overrideDeduplicator;
-    }
-
-    public void setOverrideDeduplicator(OverrideDeduplicatorSpec overrideDeduplicator) {
-        this.overrideDeduplicator = overrideDeduplicator;
-    }
-
-    public String getSerializer() {
-        return serializer;
-    }
-
-    public void setSerializer(String serializer) {
-        this.serializer = serializer;
-    }
-
-    public String getType() {
-        return type;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    public List<String> getPolicyIds() {
-        return policyIds;
-    }
-
-    public void setPolicyIds(List<String> policyIds) {
-        this.policyIds = policyIds;
-    }
-
-    public List<String> getStreamIds() {
-        return streamIds;
-    }
-
-    public void setStreamIds(List<String> streamIds) {
-        this.streamIds = streamIds;
-    }
-
-    public String getDedupIntervalMin() {
-        return dedupIntervalMin;
-    }
-
-    public void setDedupIntervalMin(String dedupIntervalMin) {
-        this.dedupIntervalMin = dedupIntervalMin;
-    }
-
-    public List<String> getDedupFields() {
-        return dedupFields;
-    }
-
-    public void setDedupFields(List<String> dedupFields) {
-        this.dedupFields = dedupFields;
-    }
-
-    public Map<String, Object> getProperties() {
-        return properties;
-    }
-
-    public void setProperties(Map<String, Object> properties) {
-        this.properties = properties;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj instanceof Publishment) {
-            Publishment p = (Publishment) obj;
-            return (Objects.equals(name, p.getName()) && Objects.equals(type, p.getType())
-                && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin())
-                && Objects.equals(dedupFields, p.getDedupFields())
-                && Objects.equals(dedupStateField, p.getDedupStateField())
-                && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator())
-                && Objects.equals(policyIds, p.getPolicyIds())
-                && Objects.equals(streamIds, p.getStreamIds())
-                && properties.equals(p.getProperties()));
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(dedupFields)
-            .append(dedupStateField).append(overrideDeduplicator).append(policyIds).append(streamIds)
-            .append(properties).build();
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Publishment[name:").append(name).append(",type:").append(type).append(",policyId:")
-            .append(policyIds).append(",properties:").append(properties);
-        return sb.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
deleted file mode 100644
index f7025f2..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.coordinator;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import java.util.*;
-
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class PublishmentType {
-    private String name;
-
-    @Override
-    public String toString() {
-        return "PublishmentType{"
-                + "name='" + name + '\''
-                + ", type='" + type + '\''
-                + ", description='" + description + '\''
-                + ", fields=" + fields
-                + '}';
-    }
-
-    private String type;
-    private String description;
-    private List<Map<String, String>> fields = new LinkedList<>();
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getType() {
-        return type;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    public List<Map<String, String>> getFields() {
-        return fields;
-    }
-
-    public void setFields(List<Map<String, String>> fields) {
-        this.fields = fields;
-    }
-
-
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj instanceof PublishmentType) {
-            PublishmentType p = (PublishmentType) obj;
-            return (Objects.equals(name, p.name)
-                && Objects.equals(type, p.type)
-                && Objects.equals(description, p.getDescription())
-                && Objects.equals(fields, p.getFields()));
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder()
-            .append(name)
-            .append(type)
-            .append(description)
-            .append(fields)
-            .build();
-    }
-
-
-    public static class Builder {
-        private final PublishmentType publishmentType;
-
-        public Builder() {
-            this.publishmentType = new PublishmentType();
-        }
-
-        public Builder type(Class<?> typeClass) {
-            this.publishmentType.setType(typeClass.getName());
-            return this;
-        }
-
-        public Builder name(String name) {
-            this.publishmentType.setName(name);
-            return this;
-        }
-
-        public Builder description(String description) {
-            this.publishmentType.setDescription(description);
-            return this;
-        }
-
-        public Builder field(Map<String,String> fieldDesc) {
-            this.publishmentType.getFields().add(fieldDesc);
-            return this;
-        }
-
-        public Builder field(String name, String value) {
-            this.publishmentType.getFields().add(new HashMap<String,String>() {
-                {
-                    put("name", name);
-                    put("value", value);
-                }
-            });
-            return this;
-        }
-
-        public Builder field(String name) {
-            this.publishmentType.getFields().add(new HashMap<String,String>() {
-                {
-                    put("name", name);
-                }
-            });
-            return this;
-        }
-
-        public PublishmentType build() {
-            return this.publishmentType;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
deleted file mode 100644
index abd9dc5..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Objects;
-
-import javax.xml.bind.annotation.adapters.XmlAdapter;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-public class StreamColumn implements Serializable {
-
-    private static final long serialVersionUID = -5457861313624389106L;
-    private String name;
-    private Type type;
-    private Object defaultValue;
-    private boolean required = true;
-    private String description;
-    private String nodataExpression;
-
-    public String toString() {
-        return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s], nodataExpression=[%s]",
-            name, type, defaultValue, required, nodataExpression);
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder()
-            .append(this.name)
-            .append(this.type)
-            .append(this.defaultValue)
-            .append(this.required)
-            .append(this.description)
-            .append(this.nodataExpression)
-            .build();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (!(obj instanceof StreamColumn)) {
-            return false;
-        }
-        return Objects.equals(this.name, ((StreamColumn) obj).name)
-            && Objects.equals(this.type, ((StreamColumn) obj).type)
-            && Objects.equals(this.defaultValue, ((StreamColumn) obj).defaultValue)
-            && Objects.equals(this.required, ((StreamColumn) obj).required)
-            && Objects.equals(this.description, ((StreamColumn) obj).description)
-            && Objects.equals(this.nodataExpression, ((StreamColumn) obj).nodataExpression);
-    }
-
-    public String getNodataExpression() {
-        return nodataExpression;
-    }
-
-    public void setNodataExpression(String nodataExpression) {
-        this.nodataExpression = nodataExpression;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    @XmlJavaTypeAdapter(StreamColumnTypeAdapter.class)
-    public Type getType() {
-        return type;
-    }
-
-    public void setType(Type type) {
-        this.type = type;
-    }
-
-    @XmlJavaTypeAdapter(value = DefaultValueAdapter.class)
-    public Object getDefaultValue() {
-        return defaultValue;
-    }
-
-    private void ensureDefaultValueType() {
-        if (this.getDefaultValue() != null && (this.getDefaultValue() instanceof String) && this.getType() != Type.STRING) {
-            switch (this.getType()) {
-                case INT:
-                    this.setDefaultValue(Integer.valueOf((String) this.getDefaultValue()));
-                    break;
-                case LONG:
-                    this.setDefaultValue(Long.valueOf((String) this.getDefaultValue()));
-                    break;
-                case FLOAT:
-                    this.setDefaultValue(Float.valueOf((String) this.getDefaultValue()));
-                    break;
-                case DOUBLE:
-                    this.setDefaultValue(Double.valueOf((String) this.getDefaultValue()));
-                    break;
-                case BOOL:
-                    this.setDefaultValue(Boolean.valueOf((String) this.getDefaultValue()));
-                    break;
-                case OBJECT:
-                    try {
-                        this.setDefaultValue(new ObjectMapper().readValue((String) this.getDefaultValue(), HashMap.class));
-                    } catch (IOException e) {
-                        throw new IllegalArgumentException(e);
-                    }
-                    break;
-                default:
-                    throw new IllegalArgumentException("Illegal type: " + this.getType());
-            }
-        }
-    }
-
-    public void setDefaultValue(Object defaultValue) {
-        this.defaultValue = defaultValue;
-        ensureDefaultValueType();
-    }
-
-    public boolean isRequired() {
-        return required;
-    }
-
-    public void setRequired(boolean required) {
-        this.required = required;
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    public enum Type implements Serializable {
-        STRING("string"), INT("int"), LONG("long"), FLOAT("float"), DOUBLE("double"), BOOL("bool"), OBJECT("object");
-
-        private final String name;
-
-        Type(String name) {
-            this.name = name;
-        }
-
-        @Override
-        public String toString() {
-            return name;
-        }
-
-        @com.fasterxml.jackson.annotation.JsonCreator
-        public static Type getEnumFromValue(String value) {
-            for (Type testEnum : values()) {
-                if (testEnum.name.equalsIgnoreCase(value)) {
-                    return testEnum;
-                }
-            }
-            throw new IllegalArgumentException();
-        }
-    }
-
-    public static class StreamColumnTypeAdapter extends XmlAdapter<String, Type> {
-
-        @Override
-        public Type unmarshal(String v) throws Exception {
-            return Type.getEnumFromValue(v);
-        }
-
-        @Override
-        public String marshal(Type v) throws Exception {
-            return v.name;
-        }
-    }
-
-    public static class DefaultValueAdapter extends XmlAdapter<String, Object> {
-        @Override
-        public Object unmarshal(String v) throws Exception {
-            return v;
-        }
-
-        @Override
-        public String marshal(Object v) throws Exception {
-            return v.toString();
-        }
-    }
-
-    public static class Builder {
-        private StreamColumn column;
-
-        public Builder() {
-            column = new StreamColumn();
-        }
-
-        public Builder name(String name) {
-            column.setName(name);
-            return this;
-        }
-
-        public Builder type(Type type) {
-            column.setType(type);
-            return this;
-        }
-
-        public Builder defaultValue(Object defaultValue) {
-            column.setDefaultValue(defaultValue);
-            return this;
-        }
-
-        public Builder required(boolean required) {
-            column.setRequired(required);
-            return this;
-        }
-
-        public StreamColumn build() {
-            return column;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
deleted file mode 100644
index af9d137..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlElementWrapper;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * This is actually a data source schema.
- *
- * @since Apr 5, 2016
- */
-public class StreamDefinition implements Serializable {
-    private static final long serialVersionUID = 2352202882328931825L;
-
-    // Stream unique ID
-    private String streamId;
-
-    // Stream description
-    private String description;
-
-    // Is validateable or not
-    private boolean validate = true;
-
-    // Is timeseries-based stream or not
-    private boolean timeseries;
-
-    // TODO: Decouple dataSource and siteId from stream definition
-
-    // Stream data source ID
-    private String dataSource;
-
-    private String group = "global";
-
-    //
-    private String streamSource;
-
-    // Tenant (Site) ID
-    private String siteId;
-
-    private List<StreamColumn> columns = new ArrayList<>();
-
-    public String toString() {
-        return String.format("StreamDefinition[group=%s, streamId=%s, dataSource=%s, description=%s, validate=%s, timeseries=%s, columns=%s",
-            group,
-            streamId,
-            dataSource,
-            description,
-            validate,
-            timeseries,
-            columns
-        );
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder()
-                .append(this.streamId)
-                .append(this.group)
-                .append(this.description)
-                .append(this.validate)
-                .append(this.timeseries)
-                .append(this.dataSource)
-                .append(streamSource)
-                .append(this.siteId)
-                .append(this.columns)
-                .build();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (!(obj instanceof StreamDefinition)) {
-            return false;
-        }
-        StreamDefinition streamDefinition = (StreamDefinition) obj;
-        return Objects.equals(this.streamId, streamDefinition.streamId)
-            && Objects.equals(this.group, streamDefinition.group)
-            && Objects.equals(this.description, streamDefinition.description)
-            && Objects.equals(this.validate, streamDefinition.validate)
-            && Objects.equals(this.timeseries, streamDefinition.timeseries)
-            && Objects.equals(this.dataSource, streamDefinition.dataSource)
-            && Objects.equals(this.streamSource, streamDefinition.streamSource)
-            && Objects.equals(this.siteId, streamDefinition.siteId)
-            && CollectionUtils.isEqualCollection(this.columns, streamDefinition.columns);
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    public void setStreamId(String streamId) {
-        this.streamId = streamId;
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    @Deprecated
-    public boolean isValidate() {
-        return validate;
-    }
-
-    public void setValidate(boolean validate) {
-        this.validate = validate;
-    }
-
-    public boolean isTimeseries() {
-        return timeseries;
-    }
-
-    public void setTimeseries(boolean timeseries) {
-        this.timeseries = timeseries;
-    }
-
-    @XmlElementWrapper(name = "columns")
-    @XmlElement(name = "column")
-    public List<StreamColumn> getColumns() {
-        return columns;
-    }
-
-    public void setColumns(List<StreamColumn> columns) {
-        this.columns = columns;
-    }
-
-    public String getDataSource() {
-        return dataSource;
-    }
-
-    public void setDataSource(String dataSource) {
-        this.dataSource = dataSource;
-    }
-
-    public int getColumnIndex(String column) {
-        int i = 0;
-        for (StreamColumn col : this.getColumns()) {
-            if (col.getName().equals(column)) {
-                return i;
-            }
-            i++;
-        }
-        return -1;
-    }
-
-    public String getSiteId() {
-        return siteId;
-    }
-
-    public void setSiteId(String siteId) {
-        this.siteId = siteId;
-    }
-
-    public String getStreamSource() {
-        return streamSource;
-    }
-
-    public void setStreamSource(String streamSource) {
-        this.streamSource = streamSource;
-    }
-
-    public StreamDefinition copy() {
-        StreamDefinition copied = new StreamDefinition();
-        copied.setColumns(this.getColumns());
-        copied.setDataSource(this.getDataSource());
-        copied.setDescription(this.getDescription());
-        copied.setSiteId(this.getSiteId());
-        copied.setStreamId(this.getStreamId());
-        copied.setGroup(this.getGroup());
-        copied.setTimeseries(this.isTimeseries());
-        copied.setValidate(this.isValidate());
-        return copied;
-    }
-
-    public String getGroup() {
-        return group;
-    }
-
-    public void setGroup(String group) {
-        this.group = group;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
deleted file mode 100644
index 0987463..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import java.io.Serializable;
-import java.util.*;
-
-/**
- * StreamPartition defines how a data stream is partitioned and sorted
- * streamId is used for distinguishing different streams which are spawned from the same data source
- * type defines how to partition data among slots within one slotqueue
- * columns are fields based on which stream is grouped
- * sortSpec defines how data is sorted.
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class StreamPartition implements Serializable {
-    private static final long serialVersionUID = -3361648309136926040L;
-
-    private String streamId;
-    private Type type;
-    private List<String> columns = new ArrayList<>();
-    private StreamSortSpec sortSpec;
-
-    public StreamPartition() {
-    }
-
-    public StreamPartition(StreamPartition o) {
-        this.streamId = o.streamId;
-        this.type = o.type;
-        this.columns = new ArrayList<String>(o.columns);
-        this.sortSpec = o.sortSpec == null ? null : new StreamSortSpec(o.sortSpec);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        if (other == this) {
-            return true;
-        }
-        if (!(other instanceof StreamPartition)) {
-            return false;
-        }
-        StreamPartition sp = (StreamPartition) other;
-        return Objects.equals(streamId, sp.streamId) && Objects.equals(type, sp.type)
-            && CollectionUtils.isEqualCollection(columns, sp.columns) && Objects.equals(sortSpec, sp.sortSpec);
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder().append(streamId).append(type).append(columns).append(sortSpec).build();
-    }
-
-    public void setType(Type type) {
-        this.type = type;
-    }
-
-    public Type getType() {
-        return this.type;
-    }
-
-    public enum Type {
-        GLOBAL("GLOBAL", 0), GROUPBY("GROUPBY", 1), SHUFFLE("SHUFFLE", 2);
-        private final String name;
-        private final int index;
-
-        Type(String name, int index) {
-            this.name = name;
-            this.index = index;
-        }
-
-        @Override
-        public String toString() {
-            return this.name;
-        }
-
-        public static Type locate(String type) {
-            Type _type = _NAME_TYPE.get(type.toUpperCase());
-            if (_type == null) {
-                throw new IllegalStateException("Illegal type name: " + type);
-            }
-            return _type;
-        }
-
-        public static Type locate(int index) {
-            Type _type = _INDEX_TYPE.get(index);
-            if (_type == null) {
-                throw new IllegalStateException("Illegal type index: " + index);
-            }
-            return _type;
-        }
-
-        private static final Map<String, Type> _NAME_TYPE = new HashMap<>();
-        private static final Map<Integer, Type> _INDEX_TYPE = new TreeMap<>();
-
-        static {
-            _NAME_TYPE.put(GLOBAL.name, GLOBAL);
-            _NAME_TYPE.put(GROUPBY.name, GROUPBY);
-            _NAME_TYPE.put(SHUFFLE.name, SHUFFLE);
-
-            _INDEX_TYPE.put(GLOBAL.index, GLOBAL);
-            _INDEX_TYPE.put(GROUPBY.index, GLOBAL);
-            _INDEX_TYPE.put(SHUFFLE.index, GLOBAL);
-        }
-    }
-
-    public List<String> getColumns() {
-        return columns;
-    }
-
-    public void setColumns(List<String> columns) {
-        this.columns = columns;
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    public void setStreamId(String streamId) {
-        this.streamId = streamId;
-    }
-
-    public StreamSortSpec getSortSpec() {
-        return sortSpec;
-    }
-
-    public void setSortSpec(StreamSortSpec sortSpec) {
-        this.sortSpec = sortSpec;
-    }
-
-    @Override
-    public String toString() {
-        return String.format("StreamPartition[streamId=%s,type=%s,columns=[%s],sortSpec=[%s]]", this.getStreamId(), this.getType(), StringUtils.join(this.getColumns(), ","), sortSpec);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
deleted file mode 100644
index ff05fc8..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.joda.time.Period;
-
-import java.io.Serializable;
-
-/**
- * streamId is the key.
- */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class StreamSortSpec implements Serializable {
-    private static final long serialVersionUID = 3626506441441584937L;
-    private String windowPeriod = "";
-    private int windowMargin = 30 * 1000; // 30 seconds by default
-
-    public StreamSortSpec() {
-    }
-
-    public StreamSortSpec(StreamSortSpec spec) {
-        this.windowPeriod = spec.windowPeriod;
-        this.windowMargin = spec.windowMargin;
-    }
-
-    public String getWindowPeriod() {
-        return windowPeriod;
-    }
-
-    public int getWindowPeriodMillis() {
-        if (StringUtils.isNotBlank(windowPeriod)) {
-            return TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(windowPeriod));
-        } else {
-            return 0;
-        }
-    }
-
-    public void setWindowPeriod(String windowPeriod) {
-        this.windowPeriod = windowPeriod;
-    }
-
-    public void setWindowPeriodMillis(int windowPeriodMillis) {
-        this.windowPeriod = Period.millis(windowPeriodMillis).toString();
-    }
-
-    public void setWindowPeriod2(Period period) {
-        this.windowPeriod = period.toString();
-    }
-
-
-    public int getWindowMargin() {
-        return windowMargin;
-    }
-
-    public void setWindowMargin(int windowMargin) {
-        this.windowMargin = windowMargin;
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder()
-                .append(windowPeriod)
-                .append(windowMargin)
-                .toHashCode();
-    }
-
-    @Override
-    public boolean equals(Object that) {
-        if (this == that) {
-            return true;
-        }
-        if (!(that instanceof StreamSortSpec)) {
-            return false;
-        }
-
-        StreamSortSpec another = (StreamSortSpec) that;
-        return
-                another.windowPeriod.equals(this.windowPeriod)
-                        && another.windowMargin == this.windowMargin;
-    }
-
-    @Override
-    public String toString() {
-        return String.format("StreamSortSpec[windowPeriod=%s,windowMargin=%d]",
-                this.getWindowPeriod(),
-                this.getWindowMargin());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
deleted file mode 100644
index 1e40309..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import java.util.Map;
-
-/**
- * @since Apr 5, 2016.
- */
-public class StreamingCluster {
-    public static enum StreamingType {
-        STORM
-    }
-
-    @JsonProperty
-    private String name;
-    @JsonProperty
-    private String zone;
-    @JsonProperty
-    private StreamingType type;
-    @JsonProperty
-    private String description;
-    /**
-     * key - nimbus for storm.
-     */
-    @JsonProperty
-    private Map<String, String> deployments;
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getZone() {
-        return zone;
-    }
-
-    public void setZone(String zone) {
-        this.zone = zone;
-    }
-
-    public StreamingType getType() {
-        return type;
-    }
-
-    public void setType(StreamingType type) {
-        this.type = type;
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    public Map<String, String> getDeployments() {
-        return deployments;
-    }
-
-    public void setDeployments(Map<String, String> deployments) {
-        this.deployments = deployments;
-    }
-
-    public static final String NIMBUS_HOST = "nimbusHost";
-    public static final String NIMBUS_THRIFT_PORT = "nimbusThriftPort";
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
deleted file mode 100644
index a794e49..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *  <p/>
- *  http://www.apache.org/licenses/LICENSE-2.0
- *  <p/>
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.model;
-
-import com.google.common.base.Preconditions;
-import org.apache.eagle.common.DateTimeUtil;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Use as final rich alert event.
- */
-public class AlertPublishEvent {
-    private String alertId;
-    private String siteId;
-    private List<String> appIds;
-    private String policyId;
-    private String policyValue;
-    private long alertTimestamp;
-    private Map<String, Object> alertData;
-    private String alertSubject;
-    private String alertBody;
-    private String streamId;
-    private String createdBy;
-    private long createdTime;
-
-    public static final String ALERT_ID_KEY = "alertId";
-    public static final String SITE_ID_KEY = "siteId";
-    public static final String APP_IDS_KEY = "appIds";
-    public static final String POLICY_ID_KEY = "policyId";
-    public static final String POLICY_VALUE_KEY = "policyValue";
-    public static final String ALERT_CATEGORY = "category";
-    public static final String ALERT_SEVERITY = "severity";
-
-    public String getAlertId() {
-        return alertId;
-    }
-
-    public void setAlertId(String alertId) {
-        this.alertId = alertId;
-    }
-
-    public List<String> getAppIds() {
-        return appIds;
-    }
-
-    public void setAppIds(List<String> appIds) {
-        this.appIds = appIds;
-    }
-
-    public String getPolicyValue() {
-        return policyValue;
-    }
-
-    public void setPolicyValue(String policyValue) {
-        this.policyValue = policyValue;
-    }
-
-    public long getAlertTimestamp() {
-        return alertTimestamp;
-    }
-
-    public void setAlertTimestamp(long alertTimestamp) {
-        this.alertTimestamp = alertTimestamp;
-    }
-
-    public String getSiteId() {
-        return siteId;
-    }
-
-    public void setSiteId(String siteId) {
-        this.siteId = siteId;
-    }
-
-
-    public String getPolicyId() {
-        return policyId;
-    }
-
-    public void setPolicyId(String policyId) {
-        this.policyId = policyId;
-    }
-
-    public Map<String, Object> getAlertData() {
-        return alertData;
-    }
-
-    public void setAlertData(Map<String, Object> alertData) {
-        this.alertData = alertData;
-    }
-
-    public static AlertPublishEvent createAlertPublishEvent(AlertStreamEvent event) {
-        Preconditions.checkNotNull(event.getAlertId(), "alertId is not initialized before being published: " + event.toString());
-        AlertPublishEvent alertEvent = new AlertPublishEvent();
-        alertEvent.setAlertId(event.getAlertId());
-        alertEvent.setPolicyId(event.getPolicyId());
-        alertEvent.setAlertTimestamp(event.getCreatedTime());
-        alertEvent.setStreamId(event.getStreamId());
-        alertEvent.setCreatedBy(event.getCreatedBy());
-        alertEvent.setCreatedTime(event.getCreatedTime());
-        alertEvent.setAlertSubject(event.getSubject());
-        alertEvent.setAlertBody(event.getBody());
-        if (event.getContext() != null && !event.getContext().isEmpty()) {
-            if (event.getContext().containsKey(SITE_ID_KEY)) {
-                alertEvent.setSiteId(event.getContext().get(SITE_ID_KEY).toString());
-            }
-            if (event.getContext().containsKey(POLICY_VALUE_KEY)) {
-                alertEvent.setPolicyValue(event.getContext().get(POLICY_VALUE_KEY).toString());
-            }
-            if (event.getContext().containsKey(APP_IDS_KEY)) {
-                alertEvent.setAppIds((List<String>) event.getContext().get(APP_IDS_KEY));
-            }
-        }
-        alertEvent.setAlertData(event.getDataMap());
-        return alertEvent;
-    }
-
-    public String toString() {
-        return String.format("%s %s alertId=%s, siteId=%s, policyId=%s, alertData=%s",
-                DateTimeUtil.millisecondsToHumanDateWithSeconds(alertTimestamp),
-                DateTimeUtil.CURRENT_TIME_ZONE.getID(),
-                alertId,
-                siteId,
-                policyId,
-                alertData.toString());
-    }
-
-    public String getAlertSubject() {
-        return alertSubject;
-    }
-
-    public void setAlertSubject(String alertSubject) {
-        this.alertSubject = alertSubject;
-    }
-
-    public String getAlertBody() {
-        return alertBody;
-    }
-
-    public void setAlertBody(String alertBody) {
-        this.alertBody = alertBody;
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    public void setStreamId(String streamId) {
-        this.streamId = streamId;
-    }
-
-    public String getCreatedBy() {
-        return createdBy;
-    }
-
-    public void setCreatedBy(String createdBy) {
-        this.createdBy = createdBy;
-    }
-
-    public long getCreatedTime() {
-        return createdTime;
-    }
-
-    public void setCreatedTime(long createdTime) {
-        this.createdTime = createdTime;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
deleted file mode 100644
index 00170df..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.model;
-
-import org.apache.eagle.alert.engine.coordinator.AlertSeverity;
-import org.apache.eagle.common.DateTimeUtil;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.*;
-
-/**
- * streamId stands for alert type instead of source event streamId.
- */
-public class AlertStreamEvent extends StreamEvent {
-    private static final long serialVersionUID = 2392131134670106397L;
-
-    private String siteId;
-    private String alertId;
-    private String policyId;
-    private StreamDefinition schema;
-    private String createdBy;
-    private long createdTime;
-    private String category;
-    private AlertSeverity severity = AlertSeverity.WARNING;
-
-    // ----------------------
-    // Lazy Alert Fields
-    // ----------------------
-
-    // Dynamical context like app related fields
-    private Map<String, Object> context;
-    // Alert content like subject and body
-    private String subject;
-    private String body;
-
-    public AlertStreamEvent() {
-    }
-
-    public AlertStreamEvent(AlertStreamEvent event) {
-        this.siteId = event.getSiteId();
-        this.alertId = event.getAlertId();
-        this.policyId = event.policyId;
-        this.schema = event.schema;
-        this.createdBy = event.createdBy;
-        this.createdTime = event.createdTime;
-        this.setTimestamp(event.getTimestamp());
-        this.setData(new Object[event.data.length]);
-        System.arraycopy(event.data, 0, this.data, 0, event.data.length);
-        this.setStreamId(event.getStreamId());
-        this.setMetaVersion(event.getMetaVersion());
-    }
-
-    public void setPolicyId(String policyId) {
-        this.policyId = policyId;
-    }
-
-    public String getPolicyId() {
-        return policyId;
-    }
-
-    @Override
-    public String toString() {
-        List<String> dataStrings = new ArrayList<>(this.getData().length);
-        for (Object obj : this.getData()) {
-            if (obj != null) {
-                dataStrings.add(obj.toString());
-            } else {
-                dataStrings.add(null);
-            }
-        }
-
-        return String.format("Alert {site=%s, stream=%s,timestamp=%s,data=%s, policyId=%s, createdBy=%s, metaVersion=%s}",
-            this.getSiteId(),
-            this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
-            this.getDataMap(), this.getPolicyId(), this.getCreatedBy(), this.getMetaVersion());
-    }
-
-    public String getCreatedBy() {
-        return createdBy;
-    }
-
-    public void setCreatedBy(String createdBy) {
-        this.createdBy = createdBy;
-    }
-
-    public StreamDefinition getSchema() {
-        return schema;
-    }
-
-    public void setSchema(StreamDefinition schema) {
-        this.schema = schema;
-    }
-
-    public long getCreatedTime() {
-        return createdTime;
-    }
-
-    public void setCreatedTime(long createdTime) {
-        this.createdTime = createdTime;
-    }
-
-    public Map<String, Object> getDataMap() {
-        Map<String, Object> event = new HashMap<>();
-        for (StreamColumn column : schema.getColumns()) {
-            Object obj = this.getData()[schema.getColumnIndex(column.getName())];
-            if (obj == null) {
-                event.put(column.getName(), null);
-                continue;
-            }
-            event.put(column.getName(), obj);
-        }
-        return event;
-    }
-
-    public Map<String, Object> getContext() {
-        return context;
-    }
-
-    public void setContext(Map<String, Object> context) {
-        this.context = context;
-    }
-
-    public String getAlertId() {
-        ensureAlertId();
-        return alertId;
-    }
-
-    public void ensureAlertId() {
-        if (this.alertId == null) {
-            this.alertId = UUID.randomUUID().toString();
-        }
-    }
-
-    public String getSubject() {
-        return subject;
-    }
-
-    public void setSubject(String subject) {
-        this.subject = subject;
-    }
-
-    public String getBody() {
-        return body;
-    }
-
-    public void setBody(String body) {
-        this.body = body;
-    }
-
-    public String getCategory() {
-        return category;
-    }
-
-    public void setCategory(String category) {
-        this.category = category;
-    }
-
-    public AlertSeverity getSeverity() {
-        return severity;
-    }
-
-    public void setSeverity(AlertSeverity severity) {
-        this.severity = severity;
-    }
-
-    public String getSiteId() {
-        return siteId;
-    }
-
-    public void setSiteId(String siteId) {
-        this.siteId = siteId;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
deleted file mode 100644
index ecca0ff..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.model;
-
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import backtype.storm.tuple.Tuple;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * This is a critical data structure across spout, router bolt and alert bolt
- * partition[StreamPartition] defines how one incoming data stream is partitioned, sorted
- * partitionKey[long] is java hash value of groupby fields. The groupby fields are defined in StreamPartition
- * event[StreamEvent] is actual data.
- */
-public class PartitionedEvent implements Serializable {
-    private static final long serialVersionUID = -3840016190614238593L;
-    private StreamPartition partition;
-    private long partitionKey;
-    private StreamEvent event;
-
-    /**
-     * Used for bolt-internal but not inter-bolts,
-     * will not pass across bolts.
-     */
-    private transient Tuple anchor;
-
-    public PartitionedEvent() {
-        this.event = null;
-        this.partition = null;
-        this.partitionKey = 0L;
-    }
-
-    public PartitionedEvent(StreamEvent event, StreamPartition partition, int partitionKey) {
-        this.event = event;
-        this.partition = partition;
-        this.partitionKey = partitionKey;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj == this) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (obj instanceof PartitionedEvent) {
-            PartitionedEvent another = (PartitionedEvent) obj;
-            return !(this.partitionKey != another.getPartitionKey()
-                    || !Objects.equals(this.event, another.getEvent())
-                    || !Objects.equals(this.partition, another.getPartition())
-                    || !Objects.equals(this.anchor, another.anchor));
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder()
-                .append(partitionKey)
-                .append(event)
-                .append(partition)
-                .append(anchor)
-                .build();
-    }
-
-    public StreamEvent getEvent() {
-        return event;
-    }
-
-    public void setEvent(StreamEvent event) {
-        this.event = event;
-    }
-
-    public StreamPartition getPartition() {
-        return partition;
-    }
-
-    public void setPartition(StreamPartition partition) {
-        this.partition = partition;
-    }
-
-    public void setPartitionKey(long partitionKey) {
-        this.partitionKey = partitionKey;
-    }
-
-    public long getPartitionKey() {
-        return this.partitionKey;
-    }
-
-    public String toString() {
-        return String.format("PartitionedEvent[partition=%s,event=%s,key=%s", partition, event, partitionKey);
-    }
-
-    public long getTimestamp() {
-        return (event != null) ? event.getTimestamp() : 0L;
-    }
-
-    public String getStreamId() {
-        return (event != null) ? event.getStreamId() : null;
-    }
-
-    public Object[] getData() {
-        return event != null ? event.getData() : null;
-    }
-
-    public boolean isSortRequired() {
-        return isPartitionRequired() && this.getPartition().getSortSpec() != null;
-    }
-
-    public boolean isPartitionRequired() {
-        return this.getPartition() != null;
-    }
-
-    public PartitionedEvent copy() {
-        PartitionedEvent copied = new PartitionedEvent();
-        copied.setEvent(this.getEvent());
-        copied.setPartition(this.partition);
-        copied.setPartitionKey(this.partitionKey);
-        return copied;
-    }
-
-    public Tuple getAnchor() {
-        return anchor;
-    }
-
-    public void setAnchor(Tuple anchor) {
-        this.anchor = anchor;
-    }
-
-    public PartitionedEvent withAnchor(Tuple tuple) {
-        this.setAnchor(tuple);
-        return this;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
deleted file mode 100644
index 130985f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.model;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.common.DateTimeUtil;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * @since Apr 5, 2016.
- */
-public class StreamEvent implements Serializable {
-    private static final long serialVersionUID = 2765116509856609763L;
-
-    protected String streamId;
-    protected Object[] data;
-    protected long timestamp;
-    protected String metaVersion;
-
-    public StreamEvent() {
-    }
-
-    public StreamEvent(String streamId, long timestamp, Object[] data) {
-        this.setStreamId(streamId);
-        this.setTimestamp(timestamp);
-        this.setData(data);
-    }
-
-    public StreamEvent(String streamId, long timestamp, Object[] data, String metaVersion) {
-        this.setStreamId(streamId);
-        this.setTimestamp(timestamp);
-        this.setData(data);
-        this.setMetaVersion(metaVersion);
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    public void setStreamId(String streamId) {
-        this.streamId = streamId;
-    }
-
-
-    public void setData(Object[] data) {
-        this.data = data;
-    }
-
-    public long getTimestamp() {
-        return timestamp;
-    }
-
-    public void setTimestamp(long timestamp) {
-        this.timestamp = timestamp;
-    }
-
-    public String getMetaVersion() {
-        return metaVersion;
-    }
-
-    public void setMetaVersion(String metaVersion) {
-        this.metaVersion = metaVersion;
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder().append(streamId).append(timestamp).append(data).append(metaVersion).build();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj == this) {
-            return true;
-        }
-        if (obj instanceof StreamEvent) {
-            StreamEvent another = (StreamEvent) obj;
-            return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp
-                    && Arrays.deepEquals(this.data, another.data) && Objects.equals(this.metaVersion, another.metaVersion);
-        }
-        return false;
-    }
-
-    @Override
-    public String toString() {
-        List<String> dataStrings = new ArrayList<>();
-        if (this.getData() != null) {
-            for (Object obj : this.getData()) {
-                if (obj != null) {
-                    dataStrings.add(obj.toString());
-                } else {
-                    dataStrings.add(null);
-                }
-            }
-        }
-        return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s],metaVersion=%s]",
-                this.getStreamId(),
-                DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
-                StringUtils.join(dataStrings, ","),
-                this.getMetaVersion());
-    }
-
-    public static StreamEventBuilder builder() {
-        return new StreamEventBuilder();
-    }
-
-    /**
-     * @return cloned new event object.
-     */
-    public StreamEvent copy() {
-        StreamEvent newEvent = new StreamEvent();
-        newEvent.setTimestamp(this.getTimestamp());
-        newEvent.setData(this.getData());
-        newEvent.setStreamId(this.getStreamId());
-        newEvent.setMetaVersion(this.getMetaVersion());
-        return newEvent;
-    }
-
-    public void copyFrom(StreamEvent event) {
-        this.setTimestamp(event.getTimestamp());
-        this.setData(event.getData());
-        this.setStreamId(event.getStreamId());
-        this.setMetaVersion(event.getMetaVersion());
-    }
-
-    public Object[] getData() {
-        return data;
-    }
-
-    public Object[] getData(StreamDefinition streamDefinition, List<String> column) {
-        ArrayList<Object> result = new ArrayList<>(column.size());
-        for (String colName : column) {
-            result.add(this.getData()[streamDefinition.getColumnIndex(colName)]);
-        }
-        return result.toArray();
-    }
-
-    public Object[] getData(StreamDefinition streamDefinition, String... column) {
-        ArrayList<Object> result = new ArrayList<>(column.length);
-        for (String colName : column) {
-            result.add(this.getData()[streamDefinition.getColumnIndex(colName)]);
-        }
-        return result.toArray();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
deleted file mode 100644
index 53101ef..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.model;
-
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class StreamEventBuilder {
-    private static final Logger LOG = LoggerFactory.getLogger(StreamEventBuilder.class);
-
-    private StreamEvent instance;
-    private StreamDefinition streamDefinition;
-
-    public StreamEventBuilder() {
-        instance = new StreamEvent();
-    }
-
-    public StreamEventBuilder schema(StreamDefinition streamDefinition) {
-        this.streamDefinition = streamDefinition;
-        if (instance.getStreamId() == null) {
-            instance.setStreamId(streamDefinition.getStreamId());
-        }
-        return this;
-    }
-
-    public StreamEventBuilder streamId(String streamId) {
-        instance.setStreamId(streamId);
-        return this;
-    }
-
-    public StreamEventBuilder attributes(Map<String, Object> data, StreamDefinition streamDefinition) {
-        this.schema(streamDefinition);
-        List<StreamColumn> columnList = streamDefinition.getColumns();
-        if (columnList != null && columnList.size() > 0) {
-            List<Object> values = new ArrayList<>(columnList.size());
-            for (StreamColumn column : columnList) {
-                values.add(data.getOrDefault(column.getName(), column.getDefaultValue()));
-            }
-            instance.setData(values.toArray());
-        } else if (LOG.isDebugEnabled()) {
-            LOG.warn("All data [{}] are ignored as no columns defined in schema {}", data, streamDefinition);
-        }
-        return this;
-    }
-
-    public StreamEventBuilder attributes(Map<String, Object> data) {
-        return attributes(data, this.streamDefinition);
-    }
-
-    public StreamEventBuilder attributes(Object... data) {
-        instance.setData(data);
-        return this;
-    }
-
-    public StreamEventBuilder timestamep(long timestamp) {
-        instance.setTimestamp(timestamp);
-        return this;
-    }
-
-    public StreamEventBuilder metaVersion(String metaVersion) {
-        instance.setMetaVersion(metaVersion);
-        return this;
-    }
-
-    public StreamEvent build() {
-        if (instance.getStreamId() == null) {
-            throw new IllegalArgumentException("streamId is null of event: " + instance);
-        }
-        return instance;
-    }
-
-    public StreamEventBuilder copyFrom(StreamEvent event) {
-        this.instance.copyFrom(event);
-        return this;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
deleted file mode 100644
index 461a23c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metric;
-
-import org.apache.eagle.alert.metric.sink.MetricSink;
-import org.apache.eagle.alert.metric.source.MetricSource;
-
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.Config;
-import java.util.Map;
-
-public interface IMetricSystem {
-
-    /**
-     * Initialize.
-     */
-    void start();
-
-    /**
-     * Schedule reporter.
-     */
-    void schedule();
-
-    /**
-     * Close and stop all resources and services.
-     */
-    void stop();
-
-    /**
-     * Manual report metric.
-     */
-    void report();
-
-    /**
-     * @param sink metric sink.
-     */
-    void register(MetricSink sink, Config config);
-
-    /**
-     * @param source metric source.
-     */
-    void register(MetricSource source);
-
-    void tags(Map<String, Object> metricTags);
-
-    MetricRegistry registry();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java
deleted file mode 100644
index 81aa75c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.metric;
-
-public class MetricConfigs {
-    public static final String METRIC_PREFIX_CONF = "metric.prefix";
-    public static final String METRIC_SINK_CONF = "metric.sink";
-    public static final String DURATION_SECONDS_CONF = "metric.durationSeconds";
-    public static final String TAGS_FIELD_NAME = "tags";
-}
\ No newline at end of file