You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:40 UTC

[32/84] [partial] eagle git commit: Clean repo for eagle site

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
deleted file mode 100644
index 8f6cbc6..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.provider;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @since Mar 28, 2016.
- */
-public class InMemScheduleConext implements IScheduleContext {
-
-    private Map<String, Topology> topologies = new HashMap<String, Topology>();
-    private Map<String, TopologyUsage> usages = new HashMap<String, TopologyUsage>();
-    private Map<String, PolicyDefinition> policies = new HashMap<String, PolicyDefinition>();
-    private Map<String, Kafka2TupleMetadata> datasources = new HashMap<String, Kafka2TupleMetadata>();
-    private Map<String, PolicyAssignment> policyAssignments = new HashMap<String, PolicyAssignment>();
-    private Map<String, StreamDefinition> schemas = new HashMap<String, StreamDefinition>();
-    private Map<StreamGroup, MonitoredStream> monitoredStreams = new HashMap<StreamGroup, MonitoredStream>();
-    private Map<String, Publishment> publishments = new HashMap<String, Publishment>();
-
-    public InMemScheduleConext() {
-    }
-
-    public InMemScheduleConext(IScheduleContext context) {
-        this.topologies = new HashMap<String, Topology>(context.getTopologies());
-        this.usages = new HashMap<String, TopologyUsage>(context.getTopologyUsages());
-        this.policies = new HashMap<String, PolicyDefinition>(context.getPolicies());
-        this.datasources = new HashMap<String, Kafka2TupleMetadata>(context.getDataSourceMetadata());
-        this.policyAssignments = new HashMap<String, PolicyAssignment>(context.getPolicyAssignments());
-        this.schemas = new HashMap<String, StreamDefinition>(context.getStreamSchemas());
-        this.monitoredStreams = new HashMap<StreamGroup, MonitoredStream>(context.getMonitoredStreams());
-        this.publishments = new HashMap<String, Publishment>(context.getPublishments());
-    }
-
-    public InMemScheduleConext(Map<String, Topology> topologies2, Map<String, PolicyAssignment> assignments,
-                               Map<String, Kafka2TupleMetadata> kafkaSources, Map<String, PolicyDefinition> policies2,
-                               Map<String, Publishment> publishments2, Map<String, StreamDefinition> streamDefinitions,
-                               Map<StreamGroup, MonitoredStream> monitoredStreamMap, Map<String, TopologyUsage> usages2) {
-        this.topologies = topologies2;
-        this.policyAssignments = assignments;
-        this.datasources = kafkaSources;
-        this.policies = policies2;
-        this.publishments = publishments2;
-        this.schemas = streamDefinitions;
-        this.monitoredStreams = monitoredStreamMap;
-        this.usages = usages2;
-    }
-
-    public Map<String, Topology> getTopologies() {
-        return topologies;
-    }
-
-    public void addTopology(Topology topo) {
-        topologies.put(topo.getName(), topo);
-    }
-
-    public Map<String, TopologyUsage> getTopologyUsages() {
-        return usages;
-    }
-
-    public void addTopologyUsages(TopologyUsage usage) {
-        usages.put(usage.getTopoName(), usage);
-    }
-
-    public Map<String, PolicyDefinition> getPolicies() {
-        return policies;
-    }
-
-    public void addPoilcy(PolicyDefinition pd) {
-        this.policies.put(pd.getName(), pd);
-    }
-
-    public Map<String, Kafka2TupleMetadata> getDatasources() {
-        return datasources;
-    }
-
-    public void setDatasources(Map<String, Kafka2TupleMetadata> datasources) {
-        this.datasources = datasources;
-    }
-
-    public void addDataSource(Kafka2TupleMetadata dataSource) {
-        this.datasources.put(dataSource.getName(), dataSource);
-    }
-
-    @Override
-    public Map<String, Kafka2TupleMetadata> getDataSourceMetadata() {
-        return datasources;
-    }
-
-    public void setPolicyOrderedTopologies(Map<String, PolicyAssignment> policyAssignments) {
-        this.policyAssignments = policyAssignments;
-    }
-
-    public Map<String, PolicyAssignment> getPolicyAssignments() {
-        return this.policyAssignments;
-    }
-
-    @Override
-    public Map<String, StreamDefinition> getStreamSchemas() {
-        return schemas;
-    }
-
-    public void addSchema(StreamDefinition schema) {
-        this.schemas.put(schema.getStreamId(), schema);
-    }
-
-    public void setStreamSchemas(Map<String, StreamDefinition> schemas) {
-        this.schemas = schemas;
-    }
-
-    @Override
-    public Map<StreamGroup, MonitoredStream> getMonitoredStreams() {
-        return monitoredStreams;
-    }
-
-    @Override
-    public Map<String, Publishment> getPublishments() {
-        return publishments;
-    }
-
-    public void addPublishment(Publishment pub) {
-        this.publishments.put(pub.getName(), pub);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
deleted file mode 100644
index 7f5dcdc..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.provider;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import com.typesafe.config.Config;
-import org.apache.commons.lang3.StringUtils;
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class NodataMetadataGenerator {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NodataMetadataGenerator.class);
-
-    private static final String NODATA_ALERT_AGGR_STREAM = "nodata_alert_aggregation_stream";
-    private static final String NODATA_ALERT_AGGR_OUTPUT_STREAM = "nodata_alert_aggregation_output_stream";
-    private static final String NODATA_ALERT_AGGR_DATASOURCE_NAME = "nodata_alert_aggregation_ds";
-    private static final String NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME = "nodata_alert_aggregation_output_ds";
-    private static final String NODATA_ALERT_AGGR_TOPIC_NAME = "nodata_alert_aggregation";
-    private static final String NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME = "nodata_alert";
-
-    private static final String DATASOURCE_TYPE = "KAFKA";
-    private static final String DATASOURCE_SCHEME_CLS = "org.apache.eagle.alert.engine.scheme.JsonScheme";
-
-    private static final String NODATA_ALERT_AGGR_POLICY_TYPE = "nodataalert";
-    private static final String NODATA_ALERT_AGGR_OUTPUT_POLICY_TYPE = "siddhi";
-
-    private static final String JSON_STRING_STREAM_NAME_SELECTOR_CLS = "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector";
-    private static final String STREAM_TIMESTAMP_COLUMN_NAME = "timestamp";
-    private static final String STREAM_TIMESTAMP_FORMAT = "";
-
-    private static final String KAFKA_PUBLISHMENT_TYPE = "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher";
-    private static final String EMAIL_PUBLISHMENT_TYPE = "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher";
-
-    private static final String PUBLISHMENT_DEDUP_DURATION = "PT0M";
-    private static final String PUBLISHMENT_SERIALIZER = "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer";
-
-    public NodataMetadataGenerator() {
-    }
-
-    public void execute(Config config, Map<String, StreamDefinition> streamDefinitionsMap,
-                        Map<String, Kafka2TupleMetadata> kafkaSources,
-                        Map<String, PolicyDefinition> policies, Map<String, Publishment> publishments) {
-        Collection<StreamDefinition> streamDefinitions = streamDefinitionsMap.values();
-        for (StreamDefinition streamDefinition : streamDefinitions) {
-            StreamColumn columnWithNodataExpression = null;
-            for (StreamColumn column : streamDefinition.getColumns()) {
-                if (StringUtils.isNotBlank(column.getNodataExpression())) {
-                    // has nodata alert setting, needs to generate the nodata alert policy
-                    if (columnWithNodataExpression != null) {
-                        columnWithNodataExpression = null;
-                        LOG.warn("Only one column in one stream is allowed to configure nodata alert");
-                        break;
-                    }
-                    columnWithNodataExpression = column;
-                }
-            }
-            if (columnWithNodataExpression != null) {
-                final String streamName = streamDefinition.getStreamId();
-
-                // create nodata alert aggr stream
-                if (streamDefinitionsMap.containsKey(NODATA_ALERT_AGGR_STREAM)) {
-                    LOG.info("Nodata alert aggregation stream: {} already exists", NODATA_ALERT_AGGR_STREAM);
-                } else {
-                    streamDefinitionsMap.put(NODATA_ALERT_AGGR_STREAM, buildAggregationStream());
-                    LOG.info("Created nodata alert aggregation stream: {}", NODATA_ALERT_AGGR_STREAM);
-                }
-
-                // create nodata alert aggr output stream
-                if (streamDefinitionsMap.containsKey(NODATA_ALERT_AGGR_OUTPUT_STREAM)) {
-                    LOG.info("Nodata alert aggregation output stream: {} already exists", NODATA_ALERT_AGGR_OUTPUT_STREAM);
-                } else {
-                    streamDefinitionsMap.put(NODATA_ALERT_AGGR_OUTPUT_STREAM, buildAggregationOutputStream());
-                    LOG.info("Created nodata alert aggregation output stream: {}", NODATA_ALERT_AGGR_OUTPUT_STREAM);
-                }
-
-                // create nodata alert data source
-                if (kafkaSources.containsKey(NODATA_ALERT_AGGR_DATASOURCE_NAME)) {
-                    LOG.info("Stream: {} nodata alert aggregation datasource: {} already exists",
-                        NODATA_ALERT_AGGR_STREAM, NODATA_ALERT_AGGR_DATASOURCE_NAME);
-                } else {
-                    kafkaSources.put(NODATA_ALERT_AGGR_DATASOURCE_NAME, buildAggregationDatasource());
-                    LOG.info("Created nodata alert aggregation datasource {} for stream {}",
-                        NODATA_ALERT_AGGR_DATASOURCE_NAME, NODATA_ALERT_AGGR_STREAM);
-                }
-
-                // create nodata alert aggregation output datasource
-                if (kafkaSources.containsKey(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME)) {
-                    LOG.info("Stream: {} nodata alert aggregation output datasource: {} already exists",
-                        NODATA_ALERT_AGGR_OUTPUT_STREAM, NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME);
-                } else {
-                    kafkaSources.put(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME, buildAggregationOutputDatasource());
-                    LOG.info("Created nodata alert aggregation output datasource {} for stream {}",
-                        NODATA_ALERT_AGGR_DATASOURCE_NAME, NODATA_ALERT_AGGR_OUTPUT_STREAM);
-                }
-
-                // create nodata alert policy
-                String policyName = streamName + "_nodata_alert";
-                String nodataExpression = columnWithNodataExpression.getNodataExpression();
-                String[] segments = nodataExpression.split(",");
-                long windowPeriodInSeconds = TimePeriodUtils.getSecondsOfPeriod(Period.parse(segments[0]));
-                if (policies.containsKey(policyName)) {
-                    LOG.info("Stream: {} nodata alert policy: {} already exists", streamName, policyName);
-                } else {
-                    policies.put(policyName, buildDynamicNodataPolicy(
-                        streamName,
-                        policyName,
-                        columnWithNodataExpression.getName(),
-                        nodataExpression,
-                        Arrays.asList(streamName)));
-                    LOG.info("Created nodata alert policy {} with expression {} for stream {}",
-                        policyName, nodataExpression, streamName);
-                }
-
-                // create nodata alert aggregation
-                String aggrPolicyName = NODATA_ALERT_AGGR_STREAM + "_policy";
-                if (policies.containsKey(aggrPolicyName)) {
-                    LOG.info("Stream: {} nodata alert aggregation policy: {} already exists",
-                        NODATA_ALERT_AGGR_OUTPUT_STREAM, aggrPolicyName);
-                } else {
-                    policies.put(aggrPolicyName, buildAggregationPolicy(
-                        aggrPolicyName,
-                        columnWithNodataExpression.getName(),
-                        windowPeriodInSeconds));
-                    LOG.info("Created nodata alert aggregation policy {} for stream {}",
-                        aggrPolicyName, NODATA_ALERT_AGGR_OUTPUT_STREAM);
-                }
-
-                // create nodata alert publish
-                String publishmentName = policyName + "_publish";
-                if (publishments.containsKey(publishmentName)) {
-                    LOG.info("Stream: {} nodata alert publishment: {} already exists", streamName, publishmentName);
-                } else {
-                    String kafkaBroker = config.getString("kafkaProducer.bootstrapServers");
-                    publishments.put(publishmentName, buildKafkaAlertPublishment(
-                        publishmentName, policyName, kafkaBroker, NODATA_ALERT_AGGR_TOPIC_NAME));
-                    publishments.put(publishmentName + "_email", buildEmailAlertPublishment(config,
-                        publishmentName + "_email", policyName, kafkaBroker, NODATA_ALERT_AGGR_TOPIC_NAME));
-                    LOG.info("Created nodata alert publishment {} for stream {}", policyName + "_publish", streamName);
-                }
-
-                // create nodata alert aggregation publish
-                String aggrPublishName = aggrPolicyName + "_publish";
-                if (publishments.containsKey(aggrPublishName)) {
-                    LOG.info("Stream: {} publishment: {} already exists", NODATA_ALERT_AGGR_STREAM, aggrPublishName);
-                } else {
-                    String kafkaBroker = config.getString("kafkaProducer.bootstrapServers");
-                    publishments.put(aggrPublishName, buildKafkaAlertPublishment(
-                        aggrPublishName, aggrPolicyName, kafkaBroker, NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME));
-                    publishments.put(aggrPublishName + "_email", buildEmailAlertPublishment(config,
-                        aggrPublishName + "_email", aggrPolicyName, kafkaBroker, NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME));
-                    LOG.info("Created nodata alert publishment {} for stream {}", policyName + "_publish", streamName);
-                }
-            }
-        }
-    }
-
-    private Kafka2TupleMetadata buildAggregationDatasource() {
-        Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
-        datasource.setName(NODATA_ALERT_AGGR_DATASOURCE_NAME);
-        datasource.setType(DATASOURCE_TYPE);
-        datasource.setSchemeCls(DATASOURCE_SCHEME_CLS);
-        datasource.setTopic(NODATA_ALERT_AGGR_TOPIC_NAME);
-        Tuple2StreamMetadata codec = new Tuple2StreamMetadata();
-        codec.setStreamNameSelectorCls(JSON_STRING_STREAM_NAME_SELECTOR_CLS);
-        codec.setTimestampColumn(STREAM_TIMESTAMP_COLUMN_NAME);
-        codec.setTimestampFormat(STREAM_TIMESTAMP_FORMAT);
-        Properties codecProperties = new Properties();
-        codecProperties.put("userProvidedStreamName", NODATA_ALERT_AGGR_STREAM);
-        codecProperties.put("streamNameFormat", "%s");
-        codec.setStreamNameSelectorProp(codecProperties);
-        datasource.setCodec(codec);
-        return datasource;
-    }
-
-    private Kafka2TupleMetadata buildAggregationOutputDatasource() {
-        Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
-        datasource.setName(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME);
-        datasource.setType(DATASOURCE_TYPE);
-        datasource.setSchemeCls(DATASOURCE_SCHEME_CLS);
-        datasource.setTopic(NODATA_ALERT_AGGR_OUTPUT_TOPIC_NAME);
-        Tuple2StreamMetadata codec = new Tuple2StreamMetadata();
-        codec.setStreamNameSelectorCls(JSON_STRING_STREAM_NAME_SELECTOR_CLS);
-        codec.setTimestampColumn(STREAM_TIMESTAMP_COLUMN_NAME);
-        codec.setTimestampFormat(STREAM_TIMESTAMP_FORMAT);
-        Properties codecProperties = new Properties();
-        codecProperties.put("userProvidedStreamName", NODATA_ALERT_AGGR_OUTPUT_STREAM);
-        codecProperties.put("streamNameFormat", "%s");
-        codec.setStreamNameSelectorProp(codecProperties);
-        datasource.setCodec(codec);
-        return datasource;
-    }
-
-    private PolicyDefinition buildDynamicNodataPolicy(String streamName, String policyName,
-                                                      String columnName, String expression, List<String> inputStream) {
-        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-        //expression, something like "PT5S,dynamic,1,host"
-        def.setValue(expression);
-        def.setType(NODATA_ALERT_AGGR_POLICY_TYPE);
-        Map<String, Object> properties = new HashMap<String, Object>();
-        properties.put("nodataColumnName", columnName);
-        def.setProperties(properties);
-        PolicyDefinition pd = new PolicyDefinition();
-        pd.setDefinition(def);
-        pd.setInputStreams(inputStream);
-        pd.setOutputStreams(Arrays.asList(NODATA_ALERT_AGGR_STREAM));
-        pd.setName(policyName);
-        pd.setDescription(String.format("Nodata alert policy for stream %s", streamName));
-
-        StreamPartition sp = new StreamPartition();
-        sp.setStreamId(streamName);
-        sp.setColumns(Arrays.asList(columnName));
-        sp.setType(StreamPartition.Type.GROUPBY);
-        pd.addPartition(sp);
-        return pd;
-    }
-
-    private PolicyDefinition buildAggregationPolicy(String policyName, String columnName,
-                                                    long windowPeriodInSeconds) {
-        final PolicyDefinition pd = new PolicyDefinition();
-        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-        String siddhiQL = String.format(
-            "from %s#window.timeBatch(%s sec) select eagle:collectWithDistinct(%s) as hosts, "
-                + "originalStreamName as streamName group by originalStreamName insert into %s",
-            NODATA_ALERT_AGGR_STREAM, windowPeriodInSeconds * 2,
-            columnName, NODATA_ALERT_AGGR_OUTPUT_STREAM);
-        LOG.info("Generated SiddhiQL {} for stream: {}", siddhiQL, NODATA_ALERT_AGGR_STREAM);
-        def.setValue(siddhiQL);
-        def.setType(NODATA_ALERT_AGGR_OUTPUT_POLICY_TYPE);
-        pd.setDefinition(def);
-        pd.setInputStreams(Arrays.asList(NODATA_ALERT_AGGR_STREAM));
-        pd.setOutputStreams(Arrays.asList(NODATA_ALERT_AGGR_OUTPUT_STREAM));
-        pd.setName(policyName);
-        pd.setDescription("Nodata alert aggregation policy, used to merge alerts from multiple bolts");
-
-        StreamPartition sp = new StreamPartition();
-        sp.setStreamId(NODATA_ALERT_AGGR_STREAM);
-        sp.setColumns(Arrays.asList(columnName));
-        sp.setType(StreamPartition.Type.GROUPBY);
-        pd.addPartition(sp);
-        pd.setParallelismHint(0);
-        return pd;
-    }
-
-    private Publishment buildKafkaAlertPublishment(String publishmentName, String policyName, String kafkaBroker, String topic) {
-        Publishment publishment = new Publishment();
-        publishment.setName(publishmentName);
-        publishment.setType(KAFKA_PUBLISHMENT_TYPE);
-        publishment.setPolicyIds(Arrays.asList(policyName));
-        publishment.setDedupIntervalMin(PUBLISHMENT_DEDUP_DURATION);
-        Map<String, Object> publishmentProperties = new HashMap<>();
-        publishmentProperties.put("kafka_broker", kafkaBroker);
-        publishmentProperties.put("topic", topic);
-        publishment.setProperties(publishmentProperties);
-        publishment.setSerializer(PUBLISHMENT_SERIALIZER);
-        return publishment;
-    }
-
-    private Publishment buildEmailAlertPublishment(Config config,
-                                                   String publishmentName, String policyName, String kafkaBroker, String topic) {
-        Publishment publishment = new Publishment();
-        publishment.setName(publishmentName);
-        publishment.setType(EMAIL_PUBLISHMENT_TYPE);
-        publishment.setPolicyIds(Arrays.asList(policyName));
-        publishment.setDedupIntervalMin(PUBLISHMENT_DEDUP_DURATION);
-        Map<String, Object> publishmentProperties = new HashMap<>();
-        publishmentProperties.put("subject", String.format("Eagle Alert - %s", topic));
-        publishmentProperties.put("template", "");
-        publishmentProperties.put("sender", config.getString("email.sender"));
-        publishmentProperties.put("recipients", config.getString("email.recipients"));
-        publishmentProperties.put("mail.smtp.host", config.getString("email.mailSmtpHost"));
-        publishmentProperties.put("mail.smtp.port", config.getString("email.mailSmtpPort"));
-        publishmentProperties.put("connection", "plaintext");
-        publishment.setProperties(publishmentProperties);
-        publishment.setSerializer(PUBLISHMENT_SERIALIZER);
-        return publishment;
-    }
-
-    private StreamDefinition buildAggregationStream() {
-        final StreamDefinition sd = new StreamDefinition();
-        StreamColumn tsColumn = new StreamColumn();
-        tsColumn.setName("timestamp");
-        tsColumn.setType(StreamColumn.Type.LONG);
-
-        StreamColumn hostColumn = new StreamColumn();
-        hostColumn.setName("host");
-        hostColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn originalStreamNameColumn = new StreamColumn();
-        originalStreamNameColumn.setName("originalStreamName");
-        originalStreamNameColumn.setType(StreamColumn.Type.STRING);
-
-        sd.setColumns(Arrays.asList(tsColumn, hostColumn, originalStreamNameColumn));
-        sd.setDataSource(NODATA_ALERT_AGGR_DATASOURCE_NAME);
-        sd.setStreamId(NODATA_ALERT_AGGR_STREAM);
-        sd.setDescription("Nodata alert aggregation stream");
-        return sd;
-    }
-
-    private StreamDefinition buildAggregationOutputStream() {
-        final StreamDefinition sd = new StreamDefinition();
-        StreamColumn hostColumn = new StreamColumn();
-        hostColumn.setName("hosts");
-        hostColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn osnColumn = new StreamColumn();
-        osnColumn.setName("streamName");
-        osnColumn.setType(StreamColumn.Type.STRING);
-
-        sd.setColumns(Arrays.asList(hostColumn, osnColumn));
-        sd.setDataSource(NODATA_ALERT_AGGR_OUTPUT_DATASOURCE_NAME);
-        sd.setStreamId(NODATA_ALERT_AGGR_OUTPUT_STREAM);
-        sd.setDescription("Nodata alert aggregation output stream");
-        return sd;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
deleted file mode 100644
index b8e3824..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.provider;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.*;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.GroupBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition.PolicyStatus;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.stream.Collectors;
-
-/**
- * FIXME: this class focus on correctness, not the efficiency now. There might
- * be problem when metadata size grows too big.
- *
- * @since May 3, 2016
- */
-public class ScheduleContextBuilder {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ScheduleContextBuilder.class);
-    private static final String UNIQUE_BOLT_ID = "%s-%s";// toponame-boltname
-
-    private Config config;
-    private IMetadataServiceClient client;
-
-    private Map<String, Topology> topologies;
-    private Map<String, PolicyAssignment> assignments;
-    private Map<String, Kafka2TupleMetadata> kafkaSources;
-    private Map<String, PolicyDefinition> policies;
-    private Map<String, Publishment> publishments;
-    private Map<String, StreamDefinition> streamDefinitions;
-    private Map<StreamGroup, MonitoredStream> monitoredStreamMap;
-    private Map<String, TopologyUsage> usages;
-    private IScheduleContext builtContext;
-
-    public ScheduleContextBuilder(Config config) {
-        this.config = config;
-        client = new MetadataServiceClientImpl(config);
-    }
-
-    public ScheduleContextBuilder(Config config, IMetadataServiceClient client) {
-        this.config = config;
-        this.client = client;
-    }
-
-    /**
-     * Built a shcedule context for metadata client service.
-     *
-     * @return
-     */
-    public IScheduleContext buildContext() {
-        topologies = listToMap(client.listTopologies());
-        kafkaSources = listToMap(client.listDataSources());
-        // filter out disabled policies
-        List<PolicyDefinition> enabledPolicies = client.listPolicies().stream().filter(
-            (t) -> t.getPolicyStatus() != PolicyStatus.DISABLED).collect(Collectors.toList());
-        policies = listToMap(enabledPolicies);
-        publishments = listToMap(client.listPublishment());
-        streamDefinitions = listToMap(client.listStreams());
-        // generate data sources, policies, publishments for nodata alert
-        new NodataMetadataGenerator().execute(config, streamDefinitions, kafkaSources, policies, publishments);
-
-        // TODO: See ScheduleState comments on how to improve the storage
-        ScheduleState state = client.getVersionedSpec();
-
-        // detect policy update, remove the policy assignments.
-        // definition change : the assignment would NOT change, the runtime will do reload and check
-        // stream change : the assignment would NOT change, the runtime will do reload and check
-        // data source change : the assignment would NOT change, the runtime will do reload and check
-        // parallelism change : the policies' assignment would be dropped when it's bigger than assign queue, and expect
-        //      to be assigned in scheduler.
-        assignments = listToMap(state == null ? new ArrayList<PolicyAssignment>() : detectAssignmentsChange(state.getAssignments(), state));
-
-        monitoredStreamMap = listToMap(state == null ? new ArrayList<MonitoredStream>() : detectMonitoredStreams(state.getMonitoredStreams()));
-
-        // build based on existing data
-        usages = buildTopologyUsage();
-
-        // copy to shedule context now
-        builtContext = new InMemScheduleConext(topologies, assignments, kafkaSources, policies, publishments,
-            streamDefinitions, monitoredStreamMap, usages);
-        return builtContext;
-    }
-
-    public IScheduleContext getBuiltContext() {
-        return builtContext;
-    }
-
-    /**
-     * 1.
-     * <pre>
-     * Check for deprecated policy stream group with its assigned monitored stream groups.
-     * If this is unmatched, we think the policy' stream group has been changed, remove the policy assignments
-     * If finally, no assignment refer to a given monitored stream, this monitored stream could be removed.
-     * Log when every time a remove happens.
-     * </pre>
-     * 2.
-     * <pre>
-     * if monitored stream's queue's is on non-existing topology, remove it.
-     * </pre>
-     *
-     * @param monitoredStreams
-     * @return
-     */
-    private List<MonitoredStream> detectMonitoredStreams(List<MonitoredStream> monitoredStreams) {
-        List<MonitoredStream> result = new ArrayList<MonitoredStream>(monitoredStreams);
-
-        // clear deprecated streams
-        clearMonitoredStreams(monitoredStreams);
-
-        // build queueId-> streamGroup
-        Map<String, StreamGroup> queue2StreamGroup = new HashMap<String, StreamGroup>();
-        for (MonitoredStream ms : result) {
-            for (StreamWorkSlotQueue q : ms.getQueues()) {
-                queue2StreamGroup.put(q.getQueueId(), ms.getStreamGroup());
-            }
-        }
-
-        // decide the assignment delete set
-        Set<StreamGroup> usedGroups = new HashSet<StreamGroup>();
-        Set<String> toRemove = new HashSet<String>();
-        // check if queue is still referenced by policy assignments
-        for (PolicyAssignment assignment : assignments.values()) {
-            PolicyDefinition def = policies.get(assignment.getPolicyName());
-            StreamGroup group = queue2StreamGroup.get(assignment.getQueueId());
-            if (group == null || !Objects.equals(group.getStreamPartitions(), def.getPartitionSpec())) {
-                LOG.warn(" policy assgiment {} 's policy group {} is different to the monitored stream's partition group {}, "
-                        + "this indicates a policy stream partition spec change, the assignment would be removed! ",
-                    assignment, def.getPartitionSpec(), group == null ? "'not found'" : group.getStreamPartitions());
-                toRemove.add(assignment.getPolicyName());
-            } else {
-                usedGroups.add(group);
-            }
-        }
-
-        // remove useless
-        assignments.keySet().removeAll(toRemove);
-        // remove non-referenced monitored streams
-        result.removeIf((t) -> {
-            boolean used = usedGroups.contains(t.getStreamGroup());
-            if (!used) {
-                LOG.warn("monitor stream with stream group {} is not referenced, "
-                    + "this monitored stream and its worker queu will be removed", t.getStreamGroup());
-            }
-            return !used;
-        });
-
-        return result;
-    }
-
-    private void clearMonitoredStreams(List<MonitoredStream> monitoredStreams) {
-        Iterator<MonitoredStream> it = monitoredStreams.iterator();
-        while (it.hasNext()) {
-            MonitoredStream ms = it.next();
-            Iterator<StreamWorkSlotQueue> queueIt = ms.getQueues().iterator();
-            Set<String> usedQueueSet = new HashSet<>();
-            assignments.values().stream().forEach(assignment -> usedQueueSet.add(assignment.getQueueId()));
-
-            // clean queues that underlying topology is changed(removed/down)
-            // clear queues that are no longer used
-            while (queueIt.hasNext()) {
-                StreamWorkSlotQueue queue = queueIt.next();
-                boolean deprecated = false;
-                for (WorkSlot ws : queue.getWorkingSlots()) {
-                    // check if topology available or bolt available
-                    if (!topologies.containsKey(ws.topologyName)
-                        || !topologies.get(ws.topologyName).getAlertBoltIds().contains(ws.boltId)) {
-                        deprecated = true;
-                        break;
-                    }
-                }
-                if (deprecated || !usedQueueSet.contains(queue.getQueueId())) {
-                    queueIt.remove();
-                }
-            }
-
-            if (ms.getQueues().isEmpty()) {
-                it.remove();
-            }
-        }
-    }
-
-    private List<PolicyAssignment> detectAssignmentsChange(List<PolicyAssignment> list, ScheduleState state) {
-        // FIXME: duplicated build map ?
-        Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>();
-        for (MonitoredStream ms : state.getMonitoredStreams()) {
-            for (StreamWorkSlotQueue q : ms.getQueues()) {
-                queueMap.put(q.getQueueId(), q);
-            }
-        }
-
-        List<PolicyAssignment> result = new ArrayList<PolicyAssignment>(list);
-        Iterator<PolicyAssignment> paIt = result.iterator();
-        while (paIt.hasNext()) {
-            PolicyAssignment assignment = paIt.next();
-
-            if (!policies.containsKey(assignment.getPolicyName())) {
-                LOG.info("Policy assignment {} 's policy not found, this assignment will be removed!", assignment);
-                paIt.remove();
-            } else {
-                StreamWorkSlotQueue queue = queueMap.get(assignment.getQueueId());
-                if (queue == null
-                    || policies.get(assignment.getPolicyName()).getParallelismHint() != queue.getQueueSize()) {
-                    // queue not found or policy has hint not equal to queue (possible a poilcy update)
-                    LOG.info("Policy assignment {} 's policy doesnt match queue: {}!", assignment, queue);
-                    paIt.remove();
-                }
-            }
-        }
-        return result;
-    }
-
-    public static <T, K> Map<K, T> listToMap(List<T> collections) {
-        Map<K, T> maps = new HashMap<K, T>(collections.size());
-        for (T t : collections) {
-            maps.put(getKey(t), t);
-        }
-        return maps;
-    }
-
-    /*
-     * One drawback, once we add class, this code need to be changed!
-     */
-    @SuppressWarnings("unchecked")
-    private static <T, K> K getKey(T t) {
-        if (t instanceof Topology) {
-            return (K) ((Topology) t).getName();
-        } else if (t instanceof PolicyAssignment) {
-            return (K) ((PolicyAssignment) t).getPolicyName();
-        } else if (t instanceof Kafka2TupleMetadata) {
-            return (K) ((Kafka2TupleMetadata) t).getName();
-        } else if (t instanceof PolicyDefinition) {
-            return (K) ((PolicyDefinition) t).getName();
-        } else if (t instanceof Publishment) {
-            return (K) ((Publishment) t).getName();
-        } else if (t instanceof StreamDefinition) {
-            return (K) ((StreamDefinition) t).getStreamId();
-        } else if (t instanceof MonitoredStream) {
-            return (K) ((MonitoredStream) t).getStreamGroup();
-        }
-        throw new RuntimeException("unexpected key class " + t.getClass());
-    }
-
-    private Map<String, TopologyUsage> buildTopologyUsage() {
-        Map<String, TopologyUsage> usages = new HashMap<String, TopologyUsage>();
-
-        // pre-build data structure for help
-        Map<String, Set<MonitoredStream>> topo2MonitorStream = new HashMap<String, Set<MonitoredStream>>();
-        Map<String, Set<String>> topo2Policies = new HashMap<String, Set<String>>();
-        // simply assume no bolt with the same id
-        Map<String, Set<String>> bolt2Policies = new HashMap<String, Set<String>>();
-        // simply assume no bolt with the same id
-        Map<String, Set<StreamGroup>> bolt2Partition = new HashMap<String, Set<StreamGroup>>();
-        // simply assume no bolt with the same id
-        Map<String, Set<String>> bolt2QueueIds = new HashMap<String, Set<String>>();
-        Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>();
-
-        preBuildQueue2TopoMap(topo2MonitorStream, topo2Policies, bolt2Policies, bolt2Partition, bolt2QueueIds, queueMap);
-
-        for (Topology t : topologies.values()) {
-            TopologyUsage u = new TopologyUsage(t.getName());
-            // add group/bolt usages
-            for (String grpBolt : t.getGroupNodeIds()) {
-                GroupBoltUsage grpUsage = new GroupBoltUsage(grpBolt);
-                u.getGroupUsages().put(grpBolt, grpUsage);
-            }
-            for (String alertBolt : t.getAlertBoltIds()) {
-                String uniqueBoltId = String.format(UNIQUE_BOLT_ID, t.getName(), alertBolt);
-
-                AlertBoltUsage alertUsage = new AlertBoltUsage(alertBolt);
-                u.getAlertUsages().put(alertBolt, alertUsage);
-                // complete usage
-                addBoltUsageInfo(bolt2Policies, bolt2Partition, bolt2QueueIds, uniqueBoltId, alertUsage, queueMap);
-            }
-
-            // policy -- policy assignment
-            if (topo2Policies.containsKey(u.getTopoName())) {
-                u.getPolicies().addAll(topo2Policies.get(u.getTopoName()));
-            }
-
-            // data source
-            buildTopologyDataSource(u);
-
-            // topology usage monitored stream -- from monitored steams' queue slot item info
-            if (topo2MonitorStream.containsKey(u.getTopoName())) {
-                u.getMonitoredStream().addAll(topo2MonitorStream.get(u.getTopoName()));
-            }
-
-            usages.put(u.getTopoName(), u);
-        }
-
-        return usages;
-    }
-
-    private void addBoltUsageInfo(Map<String, Set<String>> bolt2Policies,
-                                  Map<String, Set<StreamGroup>> bolt2Partition, Map<String, Set<String>> bolt2QueueIds, String uniqueAlertBolt,
-                                  AlertBoltUsage alertUsage, Map<String, StreamWorkSlotQueue> queueMap) {
-        //
-        if (bolt2Policies.containsKey(uniqueAlertBolt)) {
-            alertUsage.getPolicies().addAll(bolt2Policies.get(uniqueAlertBolt));
-        }
-        //
-        if (bolt2Partition.containsKey(uniqueAlertBolt)) {
-            alertUsage.getPartitions().addAll(bolt2Partition.get(uniqueAlertBolt));
-        }
-        //
-        if (bolt2QueueIds.containsKey(uniqueAlertBolt)) {
-            for (String qId : bolt2QueueIds.get(uniqueAlertBolt)) {
-                if (queueMap.containsKey(qId)) {
-                    alertUsage.getReferQueues().add(queueMap.get(qId));
-                } else {
-                    LOG.error(" queue id {} not found in queue map !", qId);
-                }
-            }
-        }
-    }
-
-    private void buildTopologyDataSource(TopologyUsage u) {
-        for (String policyName : u.getPolicies()) {
-            PolicyDefinition def = policies.get(policyName);
-            if (def != null) {
-                u.getDataSources().addAll(findDatasource(def));
-            } else {
-                LOG.error(" policy not find {}, but reference in topology usage {} !", policyName, u.getTopoName());
-            }
-        }
-    }
-
-    private List<String> findDatasource(PolicyDefinition def) {
-        List<String> result = new ArrayList<String>();
-        List<String> inputStreams = def.getInputStreams();
-        for (String is : inputStreams) {
-            StreamDefinition ss = this.streamDefinitions.get(is);
-            if (ss == null) {
-                LOG.error("policy {} referenced stream definition {} not found in definiton !", def.getName(), is);
-            } else {
-                result.add(ss.getDataSource());
-            }
-        }
-        return result;
-    }
-
-    private void preBuildQueue2TopoMap(
-        Map<String, Set<MonitoredStream>> topo2MonitorStream,
-        Map<String, Set<String>> topo2Policies,
-        Map<String, Set<String>> bolt2Policies,
-        Map<String, Set<StreamGroup>> bolt2Partition,
-        Map<String, Set<String>> bolt2QueueIds,
-        Map<String, StreamWorkSlotQueue> queueMap) {
-        // pre-build structure
-        // why don't reuse the queue.getPolicies
-        Map<String, Set<String>> queue2Policies = new HashMap<String, Set<String>>();
-        for (PolicyAssignment pa : assignments.values()) {
-            if (!queue2Policies.containsKey(pa.getQueueId())) {
-                queue2Policies.put(pa.getQueueId(), new HashSet<String>());
-            }
-            queue2Policies.get(pa.getQueueId()).add(pa.getPolicyName());
-        }
-
-        for (MonitoredStream stream : monitoredStreamMap.values()) {
-            for (StreamWorkSlotQueue q : stream.getQueues()) {
-                queueMap.put(q.getQueueId(), q);
-                Set<String> policiesOnQ = queue2Policies.containsKey(q.getQueueId()) ? queue2Policies.get(q.getQueueId()) : new HashSet<String>();
-
-                for (WorkSlot slot : q.getWorkingSlots()) {
-                    // topo2monitoredstream
-                    if (!topo2MonitorStream.containsKey(slot.getTopologyName())) {
-                        topo2MonitorStream.put(slot.getTopologyName(), new HashSet<MonitoredStream>());
-                    }
-                    topo2MonitorStream.get(slot.getTopologyName()).add(stream);
-
-                    // topo2policy
-                    if (!topo2Policies.containsKey(slot.getTopologyName())) {
-                        topo2Policies.put(slot.getTopologyName(), new HashSet<String>());
-                    }
-                    topo2Policies.get(slot.getTopologyName()).addAll(policiesOnQ);
-
-                    // bolt2Policy
-                    if (!bolt2Policies.containsKey(getUniqueBoltId(slot))) {
-                        bolt2Policies.put(getUniqueBoltId(slot), new HashSet<String>());
-                    }
-                    bolt2Policies.get(getUniqueBoltId(slot)).addAll(policiesOnQ);
-
-                    // bolt2Queue
-                    if (!bolt2QueueIds.containsKey(getUniqueBoltId(slot))) {
-                        bolt2QueueIds.put(getUniqueBoltId(slot), new HashSet<String>());
-                    }
-                    bolt2QueueIds.get(getUniqueBoltId(slot)).add(q.getQueueId());
-
-                    // bolt2Partition
-                    if (!bolt2Partition.containsKey(getUniqueBoltId(slot))) {
-                        bolt2Partition.put(getUniqueBoltId(slot), new HashSet<StreamGroup>());
-                    }
-                    bolt2Partition.get(getUniqueBoltId(slot)).add(stream.getStreamGroup());
-                }
-            }
-        }
-    }
-
-    private String getUniqueBoltId(WorkSlot slot) {
-        return String.format(UNIQUE_BOLT_ID, slot.getTopologyName(), slot.getBoltId());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
deleted file mode 100644
index 9d83c4a..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.resource;
-
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordinator.Coordinator;
-import org.apache.eagle.alert.coordinator.ScheduleOption;
-import org.apache.eagle.alert.coordinator.ValidateState;
-import org.apache.eagle.alert.utils.JsonUtils;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-
-/**
- * This is to provide API access even we don't have ZK as intermediate access.
- * FIXME : more elogant status code
- *
- * @since Mar 24, 2016 <br/>
- */
-@Path("/coordinator")
-@Produces( {"application/json"})
-public class CoordinatorResource {
-
-    // sprint config here?
-    private Coordinator alertCoordinator = new Coordinator();
-
-    @GET
-    @Path("/assignments")
-    public String getAssignments() throws Exception {
-        ScheduleState state = alertCoordinator.getState();
-        return JsonUtils.writeValueAsString(state);
-    }
-
-    @POST
-    @Path("/build")
-    public String build() throws Exception {
-        ScheduleOption option = new ScheduleOption();
-        ScheduleState state = alertCoordinator.schedule(option);
-        return JsonUtils.writeValueAsString(state);
-    }
-
-    @POST
-    @Path("/validate")
-    public String validate() throws Exception {
-        ValidateState state = alertCoordinator.validate();
-        return JsonUtils.writeValueAsString(state);
-    }
-
-    @POST
-    @Path("/enablePeriodicForceBuild")
-    public void enforcePeriodicallyBuild() {
-        alertCoordinator.enforcePeriodicallyBuild();
-    }
-
-    @POST
-    @Path("/disablePeriodicForceBuild")
-    public void disablePeriodicallyBuild() {
-        alertCoordinator.disablePeriodicallyBuild();
-    }
-
-    @SuppressWarnings("static-access")
-    @GET
-    @Path("/periodicForceBuildState")
-    public boolean statPeriodicallyBuild() {
-        return alertCoordinator.isPeriodicallyForceBuildEnable();
-    }
-
-    /**
-     * Manually update the topology usages, for administration.
-     */
-    @POST
-    @Path("/refreshUsages")
-    public String refreshUsages() {
-        // TODO
-        return "";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
deleted file mode 100644
index 454f47c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.trigger;
-
-import com.google.common.base.Stopwatch;
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.config.ConfigBusProducer;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.config.ZKConfigBuilder;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordinator.*;
-import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * @since Jun 27, 2016.
- */
-public class CoordinatorTrigger implements Runnable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(CoordinatorTrigger.class);
-
-    private Config config;
-    private IMetadataServiceClient client;
-
-    public CoordinatorTrigger(Config config, IMetadataServiceClient client) {
-        this.config = config;
-        this.client = client;
-    }
-
-    @Override
-    public void run() {
-        if (Coordinator.isPeriodicallyForceBuildEnable()) {
-            LOG.info("CoordinatorTrigger started ... ");
-
-            Stopwatch watch = Stopwatch.createStarted();
-            ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-            try (ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig)) {
-                executor.execute(Coordinator.GREEDY_SCHEDULER_ZK_PATH, () -> {
-                    // schedule
-                    IScheduleContext context = new ScheduleContextBuilder(config, client).buildContext();
-                    TopologyMgmtService mgmtService = new TopologyMgmtService();
-                    IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
-
-                    scheduler.init(context, mgmtService);
-
-                    ScheduleState state = scheduler.schedule(new ScheduleOption());
-
-                    // use try catch to use AutoCloseable interface to close producer automatically
-                    try (ConfigBusProducer producer = new ConfigBusProducer(ZKConfigBuilder.getZKConfig(config))) {
-                        Coordinator.postSchedule(client, state, producer);
-                    }
-
-                    watch.stop();
-                    LOG.info("CoordinatorTrigger ended, used time {} sm.", watch.elapsed(TimeUnit.MILLISECONDS));
-                });
-            } catch (Exception e) {
-                LOG.error("trigger schedule failed!", e);
-            }
-        } else {
-            LOG.info("CoordinatorTrigger found isPeriodicallyForceBuildEnable = false, skipped build");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
deleted file mode 100644
index 7d0ead5..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.trigger;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import com.google.common.base.Stopwatch;
-import org.apache.commons.collections.CollectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Poll policy change and notify listeners.
- */
-public class DynamicPolicyLoader implements Runnable {
-    private static Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class);
-
-    private IMetadataServiceClient client;
-    // initial cachedPolicies should be empty
-    private Map<String, PolicyDefinition> cachedPolicies = new HashMap<>();
-    private List<PolicyChangeListener> listeners = new ArrayList<>();
-
-    public DynamicPolicyLoader(IMetadataServiceClient client) {
-        this.client = client;
-    }
-
-    public synchronized void addPolicyChangeListener(PolicyChangeListener listener) {
-        listeners.add(listener);
-    }
-
-    /**
-     * When it is run at the first time, due to cachedPolicies being empty, all existing policies are expected
-     * to be addedPolicies.
-     */
-    @SuppressWarnings("unchecked")
-    @Override
-    public void run() {
-        // we should catch every exception to avoid zombile thread
-        try {
-            final Stopwatch watch = Stopwatch.createStarted();
-            LOG.info("Starting to load policies");
-            List<PolicyDefinition> current = client.listPolicies();
-            Map<String, PolicyDefinition> currPolicies = new HashMap<>();
-            current.forEach(pe -> currPolicies.put(pe.getName(), pe));
-
-            Collection<String> addedPolicies = CollectionUtils.subtract(currPolicies.keySet(), cachedPolicies.keySet());
-            Collection<String> removedPolicies = CollectionUtils.subtract(cachedPolicies.keySet(), currPolicies.keySet());
-            Collection<String> potentiallyModifiedPolicies = CollectionUtils.intersection(currPolicies.keySet(), cachedPolicies.keySet());
-
-            List<String> reallyModifiedPolicies = new ArrayList<>();
-            for (String updatedPolicy : potentiallyModifiedPolicies) {
-                if (currPolicies.get(updatedPolicy) != null
-                        && !currPolicies.get(updatedPolicy).equals(cachedPolicies.get(updatedPolicy))) {
-                    reallyModifiedPolicies.add(updatedPolicy);
-                }
-            }
-
-            boolean policyChanged = false;
-            if (addedPolicies.size() != 0
-                || removedPolicies.size() != 0
-                || reallyModifiedPolicies.size() != 0) {
-                policyChanged = true;
-            }
-
-            if (!policyChanged) {
-                LOG.info("No policy (totally {}) changed since last round", current.size());
-                return;
-            }
-
-            synchronized (this) {
-                for (PolicyChangeListener listener : listeners) {
-                    listener.onPolicyChange(current, addedPolicies, removedPolicies, reallyModifiedPolicies);
-                }
-            }
-
-            watch.stop();
-
-            LOG.info("Finished loading {} policies, added: {}, removed: {}, modified: {}, taken: {} ms",
-                current.size(), addedPolicies.size(), removedPolicies.size(), potentiallyModifiedPolicies.size(), watch.elapsed(TimeUnit.MILLISECONDS));
-            // reset cached policies
-            cachedPolicies = currPolicies;
-        } catch (Throwable t) {
-            LOG.warn("Error loading policy, but continue to run", t);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
deleted file mode 100644
index d36765f..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.trigger;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import java.util.Collection;
-import java.util.List;
-
-public interface PolicyChangeListener {
-    void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> addedPolicies, Collection<String> removedPolicies, Collection<String> modifiedPolicies);
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java
deleted file mode 100644
index 0229c20..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.coordinator.trigger;
-
-import com.google.common.base.Stopwatch;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-public class ScheduleStateCleaner implements Runnable {
-
-    private static Logger LOG = LoggerFactory.getLogger(ScheduleStateCleaner.class);
-
-    private IMetadataServiceClient client;
-    private int reservedCapacity;
-
-    public ScheduleStateCleaner(IMetadataServiceClient client, int capacity) {
-        this.client = client;
-        this.reservedCapacity = capacity;
-    }
-
-    @Override
-    public void run() {
-        // we should catch every exception to avoid zombile thread
-        try {
-            final Stopwatch watch = Stopwatch.createStarted();
-            LOG.info("clear schedule states start.");
-            client.clearScheduleState(reservedCapacity);
-            watch.stop();
-            LOG.info("clear schedule states completed. used time milliseconds: {}", watch.elapsed(TimeUnit.MILLISECONDS));
-            // reset cached policies
-        } catch (Throwable t) {
-            LOG.error("fail to clear schedule states due to {}, but continue to run", t.getMessage());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
deleted file mode 100644
index 7dadbf5..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.alert.coordinator;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.eagle.alert.config.ConfigBusConsumer;
-import org.apache.eagle.alert.config.ConfigBusProducer;
-import org.apache.eagle.alert.config.ConfigChangeCallback;
-import org.apache.eagle.alert.config.ConfigValue;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.config.ZKConfigBuilder;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordinator.Coordinator;
-import org.apache.eagle.alert.coordinator.ScheduleOption;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.apache.eagle.alert.utils.ZookeeperEmbedded;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * @since May 5, 2016
- */
-public class CoordinatorTest {
-
-    private static ZookeeperEmbedded zkEmbed;
-
-    @BeforeClass
-    public static void setup() throws Exception {
-        zkEmbed = new ZookeeperEmbedded(2181);
-        int zkPort = zkEmbed.start();
-        System.setProperty("coordinator.zkConfig.zkQuorum","localhost:"+ zkPort);
-    }
-
-    @AfterClass
-    public static void teardown() {
-        zkEmbed.shutdown();
-    }
-
-    @SuppressWarnings( {"resource", "unused"})
-    @Ignore
-    @Test
-    public void test() throws Exception {
-        before();
-        Config config = ConfigFactory.load().getConfig("coordinator");
-        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        IMetadataServiceClient client = new MetadataServiceClientImpl(config);
-
-        Coordinator coordinator = new Coordinator(config, zkConfig, client);
-        ScheduleOption option = new ScheduleOption();
-        ScheduleState state = coordinator.schedule(option);
-        String v = state.getVersion();
-
-        AtomicBoolean validated = new AtomicBoolean(false);
-        ConfigBusConsumer consumer = new ConfigBusConsumer(zkConfig, "topo1/spout", new ConfigChangeCallback() {
-            @Override
-            public void onNewConfig(ConfigValue value) {
-                String vId = value.getValue().toString();
-                Assert.assertEquals(v, vId);
-                validated.set(true);
-            }
-        });
-
-        Thread.sleep(1000);
-        Assert.assertTrue(validated.get());
-    }
-
-    @SuppressWarnings( {"resource", "unused"})
-    @Test
-    public void test_01() throws Exception {
-        before();
-        Config config = ConfigFactory.load().getConfig("coordinator");
-        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        IMetadataServiceClient client = ScheduleContextBuilderTest.getSampleMetadataService();
-
-        Coordinator coordinator = new Coordinator(config, zkConfig, client);
-        ScheduleOption option = new ScheduleOption();
-        ScheduleState state = coordinator.schedule(option);
-        String v = state.getVersion();
-
-        // TODO : assert version
-
-        CountDownLatch latch = new CountDownLatch(1);
-        AtomicBoolean validated = new AtomicBoolean(false);
-        ConfigBusConsumer consumer = new ConfigBusConsumer(zkConfig, "topo1/spout", new ConfigChangeCallback() {
-            @Override
-            public void onNewConfig(ConfigValue value) {
-                String vId = value.getValue().toString();
-                Assert.assertEquals(v, vId);
-                validated.set(true);
-                latch.countDown();
-            }
-        });
-
-        latch.await(3, TimeUnit.SECONDS);
-        Assert.assertTrue(validated.get());
-    }
-
-    @Before
-    public void before() {
-        System.setProperty("config.resource", "/test-application.conf");
-        ConfigFactory.invalidateCaches();
-        ConfigFactory.load().getConfig("coordinator");
-    }
-
-    @Test
-    public void test_Schedule() {
-        Coordinator.startSchedule();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
deleted file mode 100644
index a86dd04..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.alert.coordinator;
-
-/**
- * Since 4/28/16.
- */
-@org.junit.Ignore
-public class DynamicPolicyLoaderTest {
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
deleted file mode 100644
index d71dd88..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.alert.coordinator;
-
-import org.apache.eagle.alert.config.ConfigBusProducer;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.config.ZKConfigBuilder;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordinator.Coordinator;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * @since May 9, 2016
- */
-public class MetadataServiceClientImplTest {
-
-    @Ignore
-    @Test
-    public void addScheduleState() throws Exception {
-        ConfigFactory.invalidateCaches();
-        System.setProperty("config.resource", "/test-application.conf");
-        Config config = ConfigFactory.load("test-application.conf").getConfig("coordinator");
-        MetadataServiceClientImpl client = new MetadataServiceClientImpl(config);
-
-        ScheduleState ss = new ScheduleState();
-        ss.setVersion("spec_version_1463764252582");
-
-        client.addScheduleState(ss);
-
-        client.close();
-
-        ss.setVersion("spec_version_1464764252582");
-        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        ConfigBusProducer producer = new ConfigBusProducer(zkConfig);
-        Coordinator.postSchedule(client, ss, producer);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java
deleted file mode 100644
index 9d2b9c7..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/NodataMetadataGeneratorTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.alert.coordinator;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordinator.provider.NodataMetadataGenerator;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-public class NodataMetadataGeneratorTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NodataMetadataGeneratorTest.class);
-
-    Config config = ConfigFactory.load().getConfig("coordinator");
-    private NodataMetadataGenerator generator;
-
-    @Before
-    public void setup() {
-        generator = new NodataMetadataGenerator();
-    }
-
-    @Test
-    public void testNormal() throws Exception {
-        StreamDefinition sd = createStreamDefinitionWithNodataAlert();
-        Map<String, StreamDefinition> streamDefinitionsMap = new HashMap<String, StreamDefinition>();
-        streamDefinitionsMap.put(sd.getStreamId(), sd);
-
-        Map<String, Kafka2TupleMetadata> kafkaSources = new HashMap<String, Kafka2TupleMetadata>();
-        Map<String, PolicyDefinition> policies = new HashMap<String, PolicyDefinition>();
-        Map<String, Publishment> publishments = new HashMap<String, Publishment>();
-
-        generator.execute(config, streamDefinitionsMap, kafkaSources, policies, publishments);
-
-        Assert.assertEquals(2, kafkaSources.size());
-
-        kafkaSources.forEach((key, value) -> {
-            LOG.info("KafkaSources > {}: {}", key, ToStringBuilder.reflectionToString(value));
-        });
-
-        Assert.assertEquals(2, policies.size());
-
-        policies.forEach((key, value) -> {
-            LOG.info("Policies > {}: {}", key, ToStringBuilder.reflectionToString(value));
-        });
-
-        Assert.assertEquals(4, publishments.size());
-
-        publishments.forEach((key, value) -> {
-            LOG.info("Publishments > {}: {}", key, ToStringBuilder.reflectionToString(value));
-        });
-    }
-
-    private StreamDefinition createStreamDefinitionWithNodataAlert() {
-        StreamDefinition sd = new StreamDefinition();
-        StreamColumn tsColumn = new StreamColumn();
-        tsColumn.setName("timestamp");
-        tsColumn.setType(StreamColumn.Type.LONG);
-
-        StreamColumn hostColumn = new StreamColumn();
-        hostColumn.setName("host");
-        hostColumn.setType(StreamColumn.Type.STRING);
-        hostColumn.setNodataExpression("PT1M,dynamic,1,host");
-
-        StreamColumn valueColumn = new StreamColumn();
-        valueColumn.setName("value");
-        valueColumn.setType(StreamColumn.Type.DOUBLE);
-
-        sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
-        sd.setDataSource("testDataSource");
-        sd.setStreamId("testStreamId");
-        return sd;
-    }
-
-}