You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/04/03 11:54:17 UTC

[09/84] [partial] eagle git commit: Clean repo for eagle site

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
deleted file mode 100644
index 2af49fb..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metadata.impl;
-
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.apache.eagle.alert.metadata.resource.Models;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
-/**
- * In memory service for simple service start. Make all service API as
- * synchronized.
- *
- * @since Apr 11, 2016
- */
-public class InMemMetadataDaoImpl implements IMetadataDao {
-
-    private static final Logger LOG = LoggerFactory.getLogger(InMemMetadataDaoImpl.class);
-
-    private List<StreamingCluster> clusters = new ArrayList<StreamingCluster>();
-    private List<StreamDefinition> schemas = new ArrayList<StreamDefinition>();
-    private List<Kafka2TupleMetadata> datasources = new ArrayList<Kafka2TupleMetadata>();
-    private List<PolicyDefinition> policies = new ArrayList<PolicyDefinition>();
-    private List<Publishment> publishments = new ArrayList<Publishment>();
-    private List<PublishmentType> publishmentTypes = new ArrayList<PublishmentType>();
-    private volatile int maxScheduleState = 100;
-    private SortedMap<String, ScheduleState> scheduleStates = new TreeMap<String, ScheduleState>();
-    private List<PolicyAssignment> assignments = new ArrayList<PolicyAssignment>();
-    private List<Topology> topologies = new ArrayList<Topology>();
-    private List<AlertPublishEvent> alerts = new ArrayList<>();
-
-    @Inject
-    public InMemMetadataDaoImpl(Config config) {
-    }
-
-    @Override
-    public synchronized List<StreamingCluster> listClusters() {
-        return clusters;
-    }
-
-    @Override
-    public OpResult addCluster(final StreamingCluster cluster) {
-        return addOrReplace(clusters, cluster);
-    }
-
-    private synchronized <T> OpResult addOrReplace(List<T> clusters, T paramT) {
-        Optional<T> scOp = clusters.stream().filter(new Predicate<T>() {
-            @Override
-            public boolean test(T t) {
-                if (MetadataUtils.getKey(t).equalsIgnoreCase(MetadataUtils.getKey(paramT))) {
-                    return true;
-                }
-                return false;
-            }
-        }).findFirst();
-
-        OpResult result = new OpResult();
-        // replace
-        if (scOp.isPresent()) {
-            clusters.remove(scOp.get());
-            result.message = "replace the old one!";
-        } else {
-            result.message = "created new config!";
-        }
-        result.code = 200;
-        clusters.add(paramT);
-        return result;
-    }
-
-    @SuppressWarnings("unchecked")
-    private synchronized <T> OpResult remove(List<T> clusters, String id) {
-        T[] matched = (T[]) clusters.stream().filter(new Predicate<T>() {
-
-            @Override
-            public boolean test(T t) {
-                if (MetadataUtils.getKey(t).equalsIgnoreCase(id)) {
-                    return true;
-                }
-                return false;
-            }
-        }).toArray();
-
-        OpResult result = new OpResult();
-        result.code = 200;
-        if (clusters.removeAll(Arrays.asList(matched))) {
-            result.message = "removed configuration item succeed";
-        } else {
-            result.message = "no configuration item removed";
-        }
-        return result;
-    }
-
-    @Override
-    public OpResult removeCluster(final String clusterId) {
-        return remove(clusters, clusterId);
-    }
-
-    @Override
-    public synchronized List<StreamDefinition> listStreams() {
-        return schemas;
-    }
-
-    @Override
-    public OpResult createStream(StreamDefinition stream) {
-        return addOrReplace(schemas, stream);
-    }
-
-    @Override
-    public OpResult removeStream(String streamId) {
-        return remove(schemas, streamId);
-    }
-
-    @Override
-    public synchronized List<Kafka2TupleMetadata> listDataSources() {
-        return datasources;
-    }
-
-    @Override
-    public OpResult addDataSource(Kafka2TupleMetadata dataSource) {
-        return addOrReplace(datasources, dataSource);
-    }
-
-    @Override
-    public OpResult removeDataSource(String datasourceId) {
-        return remove(datasources, datasourceId);
-    }
-
-    @Override
-    public synchronized List<PolicyDefinition> listPolicies() {
-        return policies;
-    }
-
-    @Override
-    public OpResult addPolicy(PolicyDefinition policy) {
-        return addOrReplace(policies, policy);
-    }
-
-    @Override
-    public OpResult removePolicy(String policyId) {
-        return remove(policies, policyId);
-    }
-
-    @Override
-    public synchronized List<Publishment> listPublishment() {
-        return publishments;
-    }
-
-    @Override
-    public OpResult addPublishment(Publishment publishment) {
-        return addOrReplace(publishments, publishment);
-    }
-
-    @Override
-    public OpResult removePublishment(String pubId) {
-        return remove(publishments, pubId);
-    }
-
-    @Override
-    public List<PublishmentType> listPublishmentType() {
-        return publishmentTypes;
-    }
-
-    @Override
-    public OpResult addPublishmentType(PublishmentType publishmentType) {
-        return addOrReplace(publishmentTypes, publishmentType);
-    }
-
-    @Override
-    public OpResult removePublishmentType(String pubType) {
-        return remove(publishmentTypes, pubType);
-    }
-
-    @Override
-    public List<AlertPublishEvent> listAlertPublishEvent(int size) {
-        if (size > 0 && size <= alerts.size()) {
-            return alerts.subList(alerts.size() - size, alerts.size());
-        }
-        return alerts;
-    }
-
-    @Override
-    public AlertPublishEvent getAlertPublishEvent(String alertId) {
-        Optional<AlertPublishEvent> op = alerts.stream().filter(alert -> alert.getAlertId().equals(alertId)).findAny();
-        if (op.isPresent()) {
-            return op.get();
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size) {
-        List<AlertPublishEvent> result = alerts.stream().filter(alert -> alert.getPolicyId().equals(policyId)).collect(Collectors.toList());
-        if (size < 0 || size > result.size()) {
-            size = result.size();
-        }
-        return result.subList(result.size() - size, result.size());
-    }
-
-    @Override
-    public OpResult addAlertPublishEvent(AlertPublishEvent event) {
-        alerts.add(event);
-        OpResult result = new OpResult();
-        result.code = 200;
-        return result;
-    }
-
-
-    @Override
-    public synchronized OpResult addScheduleState(ScheduleState state) {
-        // FIXME : might concurrent issue
-        String toRemove = null;
-        if (scheduleStates.size() > maxScheduleState) {
-            toRemove = scheduleStates.firstKey();
-        }
-        scheduleStates.put(state.getVersion(), state);
-        if (toRemove != null) {
-            scheduleStates.remove(toRemove);
-        }
-
-        OpResult result = new OpResult();
-        result.code = 200;
-        result.message = "OK";
-        return result;
-    }
-
-    @Override
-    public synchronized ScheduleState getScheduleState() {
-        if (scheduleStates.size() > 0) {
-            return scheduleStates.get(scheduleStates.lastKey());
-        }
-        return null;
-    }
-
-    @Override
-    public ScheduleState getScheduleState(String versionId) {
-        return scheduleStates.get(versionId);
-    }
-
-    @Override
-    public List<ScheduleState> listScheduleStates() {
-        throw new UnsupportedOperationException("listScheduleStates not support!");
-    }
-
-    @Override
-    public OpResult clearScheduleState(int maxCapacity) {
-        throw new UnsupportedOperationException("clearScheduleState not support!");
-    }
-
-    @Override
-    public List<PolicyAssignment> listAssignments() {
-        return assignments;
-    }
-
-    @Override
-    public OpResult addAssignment(PolicyAssignment assignment) {
-        OpResult result = new OpResult();
-        result.code = 200;
-        result.message = "OK";
-        assignments.add(assignment);
-        return result;
-    }
-
-    @Override
-    public List<Topology> listTopologies() {
-        return topologies;
-    }
-
-    @Override
-    public OpResult addTopology(Topology t) {
-        return addOrReplace(topologies, t);
-    }
-
-    @Override
-    public OpResult removeTopology(String topologyName) {
-        return remove(topologies, topologyName);
-    }
-
-    @Override
-    public synchronized OpResult clear() {
-        LOG.info("clear models...");
-        this.assignments.clear();
-        this.clusters.clear();
-        this.datasources.clear();
-        this.policies.clear();
-        this.publishments.clear();
-        this.scheduleStates.clear();
-        this.schemas.clear();
-        this.topologies.clear();
-        OpResult result = new OpResult();
-        result.code = 200;
-        result.message = "OK";
-        return result;
-    }
-
-    @Override
-    public Models export() {
-        Models models = new Models();
-        models.assignments.addAll(this.assignments);
-        models.clusters.addAll(this.clusters);
-        models.datasources.addAll(this.datasources);
-        models.policies.addAll(this.policies);
-        models.publishments.addAll(this.publishments);
-        models.scheduleStates.putAll(this.scheduleStates);
-        models.schemas.addAll(this.schemas);
-        models.topologies.addAll(this.topologies);
-        return models;
-    }
-
-    @Override
-    public OpResult importModels(Models models) {
-        LOG.info("clear and import models...");
-        clear();
-        this.assignments.addAll(models.assignments);
-        this.clusters.addAll(models.clusters);
-        this.datasources.addAll(models.datasources);
-        this.policies.addAll(models.policies);
-        this.publishments.addAll(models.publishments);
-        this.scheduleStates.putAll(models.scheduleStates);
-        this.schemas.addAll(models.schemas);
-        this.topologies.addAll(models.topologies);
-        OpResult result = new OpResult();
-        result.code = 200;
-        result.message = "OK";
-        return result;
-    }
-
-    @Override
-    public void close() throws IOException {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
deleted file mode 100644
index e0b5c9d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metadata.impl;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.apache.eagle.alert.metadata.resource.Models;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @since May 26, 2016.
- */
-public class JdbcMetadataDaoImpl implements IMetadataDao {
-    private static final Logger LOG = LoggerFactory.getLogger(JdbcMetadataDaoImpl.class);
-    private JdbcMetadataHandler handler;
-
-    @Inject
-    public JdbcMetadataDaoImpl(Config config) {
-        handler = new JdbcMetadataHandler(config.getConfig(MetadataUtils.META_DATA));
-    }
-
-    @Override
-    public List<Topology> listTopologies() {
-        return handler.list(Topology.class);
-    }
-
-    @Override
-    public List<StreamingCluster> listClusters() {
-        return handler.list(StreamingCluster.class);
-    }
-
-    @Override
-    public List<StreamDefinition> listStreams() {
-        return handler.list(StreamDefinition.class);
-    }
-
-    @Override
-    public List<Kafka2TupleMetadata> listDataSources() {
-        return handler.list(Kafka2TupleMetadata.class);
-    }
-
-    @Override
-    public List<PolicyDefinition> listPolicies() {
-        return handler.list(PolicyDefinition.class);
-    }
-
-    @Override
-    public List<Publishment> listPublishment() {
-        return handler.listPublishments();
-    }
-
-    @Override
-    public List<AlertPublishEvent> listAlertPublishEvent(int size) {
-        if (size <= 0) {
-            LOG.info("Invalid parameter size <= 0");
-            return new ArrayList<>();
-        }
-        return handler.listAlertEvents(null, null, size);
-    }
-
-    public PolicyDefinition getPolicyById(String policyId) {
-        return handler.queryById(PolicyDefinition.class, policyId);
-    }
-
-    public List<Publishment> getPublishmentsByPolicyId(String policyId) {
-        return handler.getPublishmentsByPolicyId(policyId);
-    }
-
-    @Override
-    public AlertPublishEvent getAlertPublishEvent(String alertId) {
-        return handler.getAlertEventById(alertId, 1);
-    }
-
-    @Override
-    public List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size) {
-        if (size <= 0) {
-            LOG.info("Invalid parameter size <= 0");
-            return new ArrayList<>();
-        }
-        return handler.getAlertEventByPolicyId(policyId, size);
-    }
-
-    @Override
-    public ScheduleState getScheduleState(String versionId) {
-        return handler.queryById(ScheduleState.class, versionId);
-    }
-
-    @Override
-    public ScheduleState getScheduleState() {
-        List<ScheduleState> scheduleStates =
-                handler.list(ScheduleState.class, JdbcMetadataHandler.SortType.DESC);
-        if (scheduleStates.isEmpty()) {
-            return null;
-        } else {
-            return scheduleStates.get(0);
-        }
-    }
-
-    @Override
-    public List<ScheduleState> listScheduleStates() {
-        return handler.list(ScheduleState.class);
-    }
-
-    @Override
-    public List<PolicyAssignment> listAssignments() {
-        return handler.list(PolicyAssignment.class);
-    }
-
-    @Override
-    public List<PublishmentType> listPublishmentType() {
-        return handler.list(PublishmentType.class);
-    }
-
-    @Override
-    public OpResult addTopology(Topology t) {
-        return handler.addOrReplace(Topology.class.getSimpleName(), t);
-    }
-
-    @Override
-    public OpResult addCluster(StreamingCluster cluster) {
-        return handler.addOrReplace(StreamingCluster.class.getSimpleName(), cluster);
-    }
-
-    @Override
-    public OpResult addAlertPublishEvent(AlertPublishEvent event) {
-        return handler.addAlertEvent(event);
-    }
-
-    @Override
-    public OpResult createStream(StreamDefinition stream) {
-        return handler.addOrReplace(StreamDefinition.class.getSimpleName(), stream);
-    }
-
-    @Override
-    public OpResult addDataSource(Kafka2TupleMetadata dataSource) {
-        return handler.addOrReplace(Kafka2TupleMetadata.class.getSimpleName(), dataSource);
-    }
-
-    @Override
-    public OpResult addPolicy(PolicyDefinition policy) {
-        return handler.addOrReplace(PolicyDefinition.class.getSimpleName(), policy);
-    }
-
-    @Override
-    public OpResult addPublishment(Publishment publishment) {
-        return handler.addOrReplace(Publishment.class.getSimpleName(), publishment);
-    }
-
-    @Override
-    public OpResult addPublishmentsToPolicy(String policyId, List<String> publishmentIds) {
-        return handler.addPublishmentsToPolicy(policyId, publishmentIds);
-    }
-
-    @Override
-    public OpResult addScheduleState(ScheduleState state) {
-        return handler.addOrReplace(ScheduleState.class.getSimpleName(), state);
-    }
-
-    @Override
-    public OpResult clearScheduleState(int maxCapacity) {
-        if (maxCapacity <= 0) {
-            maxCapacity = 10;
-        }
-        OpResult result = handler.removeScheduleStates(maxCapacity);
-        LOG.info(result.message);
-        return result;
-    }
-
-    @Override
-    public OpResult addAssignment(PolicyAssignment assignment) {
-        return handler.addOrReplace(PolicyAssignment.class.getSimpleName(), assignment);
-    }
-
-    @Override
-    public OpResult addPublishmentType(PublishmentType publishmentType) {
-        return handler.addOrReplace(PublishmentType.class.getSimpleName(), publishmentType);
-    }
-
-    @Override
-    public OpResult removeTopology(String topologyName) {
-        return handler.removeById(Topology.class.getSimpleName(), topologyName);
-    }
-
-    @Override
-    public OpResult removeCluster(String clusterId) {
-        return handler.removeById(StreamingCluster.class.getSimpleName(), clusterId);
-    }
-
-    @Override
-    public OpResult removeStream(String streamId) {
-        return handler.removeById(StreamDefinition.class.getSimpleName(), streamId);
-    }
-
-    @Override
-    public OpResult removeDataSource(String datasourceId) {
-        return handler.removeById(Kafka2TupleMetadata.class.getSimpleName(), datasourceId);
-    }
-
-    @Override
-    public OpResult removePolicy(String policyId) {
-        return handler.removeById(PolicyDefinition.class.getSimpleName(), policyId);
-    }
-
-    @Override
-    public OpResult removePublishment(String pubId) {
-        return handler.removeById(Publishment.class.getSimpleName(), pubId);
-    }
-
-    @Override
-    public OpResult removePublishmentType(String name) {
-        return handler.removeById(PublishmentType.class.getSimpleName(), name);
-    }
-
-    @Override
-    public OpResult clear() {
-        throw new UnsupportedOperationException("clear not support!");
-    }
-
-    @Override
-    public Models export() {
-        throw new UnsupportedOperationException("clear not support!");
-    }
-
-    @Override
-    public OpResult importModels(Models models) {
-        throw new UnsupportedOperationException("clear not support!");
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (handler != null) {
-            handler.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java
deleted file mode 100644
index a9e3c5e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.metadata.impl;
-
-import org.apache.commons.collections.map.HashedMap;
-import org.apache.commons.dbcp.BasicDataSource;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.sql.DataSource;
-import java.io.IOException;
-import java.sql.*;
-import java.util.*;
-import java.util.function.Function;
-
-public class JdbcMetadataHandler {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JdbcMetadataHandler.class);
-    // general model
-    private static final String INSERT_STATEMENT = "INSERT INTO %s(content, id) VALUES (?, ?)";
-    private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE id=?";
-    private static final String UPDATE_STATEMENT = "UPDATE %s set content=? WHERE id=?";
-    private static final String QUERY_ALL_STATEMENT = "SELECT content FROM %s";
-    private static final String QUERY_CONDITION_STATEMENT = "SELECT content FROM %s WHERE id=?";
-    private static final String QUERY_ORDERBY_STATEMENT = "SELECT content FROM %s ORDER BY id %s";
-
-    // customized model
-    private static final String CLEAR_SCHEDULESTATES_STATEMENT = "DELETE FROM schedule_state WHERE id NOT IN (SELECT id from (SELECT id FROM schedule_state ORDER BY id DESC limit ?) as states)";
-    private static final String INSERT_ALERT_STATEMENT = "INSERT INTO alert_event(alertId, siteId, appIds, policyId, alertTimestamp, policyValue, alertData) VALUES (?, ?, ?, ?, ?, ?, ?)";
-    private static final String QUERY_ALERT_STATEMENT = "SELECT * FROM alert_event order by alertTimestamp DESC limit ?";
-    private static final String QUERY_ALERT_BY_ID_STATEMENT = "SELECT * FROM alert_event WHERE alertId=? order by alertTimestamp DESC limit ?";
-    private static final String QUERY_ALERT_BY_POLICY_STATEMENT = "SELECT * FROM alert_event WHERE policyId=? order by alertTimestamp DESC limit ?";
-    private static final String INSERT_POLICYPUBLISHMENT_STATEMENT = "INSERT INTO policy_publishment(policyId, publishmentName) VALUES (?, ?)";
-    private static final String DELETE_PUBLISHMENT_STATEMENT = "DELETE FROM policy_publishment WHERE policyId=?";
-    private static final String QUERY_PUBLISHMENT_BY_POLICY_STATEMENT = "SELECT content FROM publishment a INNER JOIN policy_publishment b ON a.id=b.publishmentName and b.policyId=?";
-    private static final String QUERY_PUBLISHMENT_STATEMENT = "SELECT a.content, b.policyId FROM publishment a LEFT JOIN policy_publishment b ON a.id=b.publishmentName";
-
-    public enum SortType { DESC, ASC }
-
-    private static Map<String, String> tblNameMap = new HashMap<>();
-
-    private static final ObjectMapper mapper = new ObjectMapper();
-    private DataSource dataSource;
-
-    static {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-        registerTableName(StreamingCluster.class.getSimpleName(), "stream_cluster");
-        registerTableName(StreamDefinition.class.getSimpleName(), "stream_definition");
-        registerTableName(Kafka2TupleMetadata.class.getSimpleName(), "kafka_tuple_metadata");
-        registerTableName(PolicyDefinition.class.getSimpleName(), "policy_definition");
-        registerTableName(Publishment.class.getSimpleName(), "publishment");
-        registerTableName(PublishmentType.class.getSimpleName(), "publishment_type");
-        registerTableName(ScheduleState.class.getSimpleName(), "schedule_state");
-        registerTableName(PolicyAssignment.class.getSimpleName(), "policy_assignment");
-        registerTableName(Topology.class.getSimpleName(), "topology");
-        registerTableName(AlertPublishEvent.class.getSimpleName(), "alert_event");
-    }
-
-    private static void registerTableName(String clzName, String tblName) {
-        tblNameMap.put(clzName, tblName);
-    }
-
-    public JdbcMetadataHandler(Config config) {
-        try {
-            //JdbcSchemaManager.getInstance().init(config);
-            BasicDataSource bDatasource = new BasicDataSource();
-            bDatasource.setDriverClassName(config.getString(MetadataUtils.JDBC_DRIVER_PATH));
-            if (config.hasPath(MetadataUtils.JDBC_USERNAME_PATH)) {
-                bDatasource.setUsername(config.getString(MetadataUtils.JDBC_USERNAME_PATH));
-                bDatasource.setPassword(config.getString(MetadataUtils.JDBC_PASSWORD_PATH));
-            }
-            bDatasource.setUrl(config.getString(MetadataUtils.JDBC_CONNECTION_PATH));
-            if (config.hasPath(MetadataUtils.JDBC_CONNECTION_PROPERTIES_PATH)) {
-                bDatasource.setConnectionProperties(config.getString(MetadataUtils.JDBC_CONNECTION_PROPERTIES_PATH));
-            }
-            this.dataSource = bDatasource;
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
-    private String getTableName(String clzName) {
-        String tbl = tblNameMap.get(clzName);
-        if (tbl != null) {
-            return tbl;
-        } else {
-            return clzName;
-        }
-    }
-
-    private void closeResource(ResultSet rs, PreparedStatement statement, Connection connection) {
-        if (rs != null) {
-            try {
-                rs.close();
-            } catch (SQLException e) {
-                LOG.info(e.getMessage(), e);
-            }
-        }
-        if (statement != null) {
-            try {
-                statement.close();
-            } catch (SQLException e) {
-                LOG.info("Failed to close statement: {}", e.getMessage(), e);
-            }
-        }
-        if (connection != null) {
-            try {
-                connection.close();
-            } catch (SQLException e) {
-                LOG.error("Failed to close connection: {}", e.getMessage(), e.getCause());
-            }
-        }
-    }
-
-    private OpResult executeUpdate(Connection connection, String query, String key, String value) throws SQLException {
-        OpResult result = new OpResult();
-        PreparedStatement statement = null;
-        try {
-            statement = connection.prepareStatement(query);
-            Clob clob = connection.createClob();
-            clob.setString(1, value);
-            statement.setClob(1, clob);
-            statement.setString(2, key);
-            int status = statement.executeUpdate();
-            LOG.info("update {} with query={}", status, query);
-        } finally {
-            if (statement != null) {
-                statement.close();
-            }
-        }
-        return result;
-    }
-
-
-    public <T> OpResult addOrReplace(String clzName, T t) {
-        String tb = getTableName(clzName);
-        OpResult result = new OpResult();
-        Savepoint savepoint = null;
-        String key = null;
-        String value = null;
-        Connection connection = null;
-        try {
-            connection = dataSource.getConnection();
-            key = MetadataUtils.getKey(t);
-            value = mapper.writeValueAsString(t);
-            connection.setAutoCommit(false);
-            savepoint = connection.setSavepoint("insertEntity");
-            result = executeUpdate(connection, String.format(INSERT_STATEMENT, tb), key, value);
-            connection.commit();
-        } catch (SQLException e) {
-            LOG.warn("fail to insert entity due to {}, and try to updated instead", e.getMessage());
-            if (connection != null) {
-                LOG.info("Detected duplicated entity");
-                try {
-                    connection.rollback(savepoint);
-                    executeUpdate(connection, String.format(UPDATE_STATEMENT, tb), key, value);
-                    connection.commit();
-                    connection.setAutoCommit(true);
-                } catch (SQLException e1) {
-                    LOG.warn("Rollback failed", e1);
-                }
-            }
-        } catch (JsonProcessingException e) {
-            LOG.error("Got JsonProcessingException: {}", e.getMessage(), e.getCause());
-            result.code = OpResult.FAILURE;
-            result.message = e.getMessage();
-        } finally {
-            closeResource(null, null, connection);
-        }
-        return result;
-    }
-
-
-    public <T> List<T> list(Class<T> clz) {
-        return list(clz, null);
-    }
-
-    public <T> List<T> list(Class<T> clz,  SortType sortType) {
-        List<T> result = new LinkedList<T>();
-        Connection connection = null;
-        PreparedStatement statement = null;
-        try {
-            String tb = getTableName(clz.getSimpleName());
-            String query = String.format(QUERY_ALL_STATEMENT, tb);
-            if (sortType != null) {
-                query = String.format(QUERY_ORDERBY_STATEMENT, tb, sortType.toString());
-            }
-            connection = dataSource.getConnection();
-            statement = connection.prepareStatement(query);
-            return executeList(statement, clz);
-        } catch (SQLException ex) {
-            LOG.error(ex.getMessage(), ex);
-        } finally {
-            closeResource(null, statement, connection);
-        }
-        return result;
-    }
-
-    private <T> List<T> executeList(PreparedStatement statement, Class<T> clz) throws SQLException {
-        List<T> result = new LinkedList<>();
-        ResultSet rs = null;
-        try {
-            rs = statement.executeQuery();
-            while (rs.next()) {
-                try {
-                    String content = rs.getString(1);
-                    result.add(mapper.readValue(content, clz)) ;
-                } catch (Exception e) {
-                    throw new IllegalStateException(e);
-                }
-            }
-        } finally {
-            if (rs != null) {
-                rs.close();
-            }
-        }
-        return result;
-    }
-
-    private <T> List<T> executeList(PreparedStatement statement, Function<ResultSet, T> selectFun) throws SQLException {
-        List<T> result = new LinkedList<>();
-        ResultSet rs = null;
-        try {
-            rs = statement.executeQuery();
-            while (rs.next()) {
-                result.add(selectFun.apply(rs));
-            }
-        } finally {
-            if (rs != null) {
-                rs.close();
-            }
-        }
-        return result;
-    }
-
-    public <T> T queryById(Class<T> clz, String id) {
-        List<T> result = new LinkedList<T>();
-        Connection connection = null;
-        PreparedStatement statement = null;
-        try {
-            String tb = getTableName(clz.getSimpleName());
-            connection = dataSource.getConnection();
-            statement = connection.prepareStatement(String.format(QUERY_CONDITION_STATEMENT, tb));
-            statement.setString(1, id);
-            result = executeList(statement, clz);
-        } catch (SQLException ex) {
-            LOG.error(ex.getMessage(), ex);
-        } finally {
-            closeResource(null, statement, connection);
-        }
-        if (result.isEmpty()) {
-            return null;
-        } else {
-            return result.get(0);
-        }
-    }
-
-    public AlertPublishEvent getAlertEventById(String alertId, int size) {
-        List<AlertPublishEvent> alerts = listAlertEvents(QUERY_ALERT_BY_ID_STATEMENT, alertId, size);
-        if (alerts.isEmpty()) {
-            return null;
-        } else {
-            return alerts.get(0);
-        }
-    }
-
-    public List<AlertPublishEvent> getAlertEventByPolicyId(String policyId, int size) {
-        return listAlertEvents(QUERY_ALERT_BY_POLICY_STATEMENT, policyId, size);
-    }
-
-    public List<AlertPublishEvent> listAlertEvents(String query, String filter, int size) {
-        List<AlertPublishEvent> alerts = new LinkedList<>();
-        Connection connection = null;
-        PreparedStatement statement = null;
-        try {
-            connection = dataSource.getConnection();
-            if (query == null) {
-                query = QUERY_ALERT_STATEMENT;
-                statement = connection.prepareStatement(query);
-                statement.setInt(1, size);
-            } else {
-                statement = connection.prepareStatement(query);
-                statement.setString(1, filter);
-                statement.setInt(2, size);
-            }
-            alerts = executeList(statement, rs -> {
-                try {
-                    AlertPublishEvent event = new AlertPublishEvent();
-                    event.setAlertId(rs.getString(1));
-                    event.setSiteId(rs.getString(2));
-                    event.setAppIds(mapper.readValue(rs.getString(3), List.class));
-                    event.setPolicyId(rs.getString(4));
-                    event.setAlertTimestamp(rs.getLong(5));
-                    event.setPolicyValue(rs.getString(6));
-                    event.setAlertData(mapper.readValue(rs.getString(7), Map.class));
-                    return event;
-                } catch (Exception e) {
-                    throw new IllegalStateException(e);
-                }
-            });
-        } catch (SQLException ex) {
-            LOG.error(ex.getMessage(), ex);
-        } finally {
-            closeResource(null, statement, connection);
-        }
-        return alerts;
-    }
-
-    public List<Publishment> listPublishments() {
-        List<Publishment> result = new LinkedList<>();
-        Connection connection = null;
-        PreparedStatement statement = null;
-        ResultSet rs = null;
-        try {
-            connection = dataSource.getConnection();
-            statement = connection.prepareStatement(QUERY_PUBLISHMENT_STATEMENT);
-            Map<String, List<String>> publishPolicyMap = new HashedMap();
-            rs = statement.executeQuery();
-            while (rs.next()) {
-                String publishment = rs.getString(1);
-                String policyId = rs.getString(2);
-                List<String> policyIds = publishPolicyMap.get(publishment);
-                if (policyIds == null) {
-                    policyIds = new ArrayList<>();
-                    publishPolicyMap.put(publishment, policyIds);
-                }
-                if (policyId != null) {
-                    policyIds.add(policyId);
-                }
-            }
-            for (Map.Entry<String, List<String>> entry : publishPolicyMap.entrySet()) {
-                Publishment publishment = mapper.readValue(entry.getKey(), Publishment.class);
-                publishment.setPolicyIds(entry.getValue());
-                result.add(publishment);
-            }
-        } catch (Exception ex) {
-            LOG.error(ex.getMessage(), ex);
-        } finally {
-            closeResource(rs, statement, connection);
-        }
-        return result;
-    }
-
-    public List<Publishment> getPublishmentsByPolicyId(String policyId) {
-        List<Publishment> result = new LinkedList<>();
-        Connection connection = null;
-        PreparedStatement statement = null;
-        try {
-            connection = dataSource.getConnection();
-            statement = connection.prepareStatement(QUERY_PUBLISHMENT_BY_POLICY_STATEMENT);
-            statement.setString(1, policyId);
-            result = executeList(statement, Publishment.class);
-        } catch (SQLException ex) {
-            LOG.error(ex.getMessage(), ex);
-        } finally {
-            closeResource(null, statement, connection);
-        }
-        return result;
-    }
-
-    public OpResult addAlertEvent(AlertPublishEvent event) {
-        Connection connection = null;
-        PreparedStatement statement = null;
-        OpResult result = new OpResult();
-        try {
-            connection = dataSource.getConnection();
-            statement = connection.prepareStatement(INSERT_ALERT_STATEMENT);
-            statement.setString(1, event.getAlertId());
-            statement.setString(2, event.getSiteId());
-            statement.setString(3, mapper.writeValueAsString(event.getAppIds()));
-            statement.setString(4, event.getPolicyId());
-            statement.setLong(5, event.getAlertTimestamp());
-            statement.setString(6, event.getPolicyValue());
-            statement.setString(7, mapper.writeValueAsString(event.getAlertData()));
-            LOG.info("start to add alert event");
-            int status = statement.executeUpdate();
-            result.code = OpResult.SUCCESS;
-            result.message = String.format("add %d records into alert_event successfully", status);
-        } catch (Exception ex) {
-            result.code = OpResult.FAILURE;
-            result.message = ex.getMessage();
-        } finally {
-            closeResource(null, statement, connection);
-        }
-        LOG.info(result.message);
-        return result;
-    }
-
-    public OpResult addPublishmentsToPolicy(String policyId, List<String> publishmentIds) {
-        OpResult result = new OpResult();
-        Connection connection = null;
-        PreparedStatement statement = null;
-        try {
-            connection = dataSource.getConnection();
-            connection.setAutoCommit(false);
-            statement = connection.prepareStatement(DELETE_PUBLISHMENT_STATEMENT);
-            statement.setString(1, policyId);
-            int status = statement.executeUpdate();
-            LOG.info("delete {} records from policy_publishment", status);
-            closeResource(null, statement, null);
-
-            statement = connection.prepareStatement(INSERT_POLICYPUBLISHMENT_STATEMENT);
-            for (String pub : publishmentIds) {
-                statement.setString(1, policyId);
-                statement.setString(2, pub);
-                statement.addBatch();
-            }
-            int[] num = statement.executeBatch();
-            connection.commit();
-            connection.setAutoCommit(true);
-            int sum = 0;
-            for (int i : num) {
-                sum += i;
-            }
-            result.code = OpResult.SUCCESS;
-            result.message = String.format("Add %d records into policy_publishment", sum);
-        } catch (SQLException ex) {
-            LOG.error("Error to add publishments to policy {}", policyId, ex);
-            result.code = OpResult.FAILURE;
-            result.message = ex.getMessage();
-        } finally {
-            closeResource(null, statement, connection);
-        }
-        LOG.info(result.message);
-        return result;
-    }
-
-    public OpResult removeById(String clzName, String key) {
-        Connection connection = null;
-        PreparedStatement statement = null;
-        OpResult result = new OpResult();
-        try {
-            String tb = getTableName(clzName);
-            connection = dataSource.getConnection();
-            statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb));
-            statement.setString(1, key);
-            LOG.info("start to delete records from {} with id={}", tb, key);
-            int status = statement.executeUpdate();
-            result.code = OpResult.SUCCESS;
-            result.message = String.format("removed %d records from %s successfully", status, tb);
-        } catch (SQLException ex) {
-            result.code = OpResult.FAILURE;
-            result.message = ex.getMessage();
-        } finally {
-            closeResource(null, statement, connection);
-        }
-        LOG.info(result.message);
-        return result;
-    }
-
-    public void close() throws IOException {
-        //JdbcSchemaManager.getInstance().shutdown();
-    }
-
-    public OpResult removeScheduleStates(int capacity) {
-        Connection connection = null;
-        PreparedStatement statement = null;
-        OpResult result = new OpResult();
-        try {
-            connection = dataSource.getConnection();
-            statement = connection.prepareStatement(CLEAR_SCHEDULESTATES_STATEMENT);
-            statement.setInt(1, capacity);
-            LOG.info("start to delete schedule states");
-            int status = statement.executeUpdate();
-            result.code = OpResult.SUCCESS;
-            result.message = String.format("removed %d records from schedule_state successfully", status);
-        } catch (SQLException ex) {
-            result.code = OpResult.FAILURE;
-            result.message = ex.getMessage();
-        } finally {
-            closeResource(null, statement, connection);
-        }
-        LOG.info(result.message);
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java
deleted file mode 100644
index a02c51e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.metadata.impl;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import com.typesafe.config.Config;
-import org.apache.ddlutils.Platform;
-import org.apache.ddlutils.PlatformFactory;
-import org.apache.ddlutils.model.Column;
-import org.apache.ddlutils.model.Database;
-import org.apache.ddlutils.model.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Types;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-@Deprecated
-public class JdbcSchemaManager {
-
-    private static final Logger LOG = LoggerFactory.getLogger(JdbcSchemaManager.class);
-    private Database database;
-    private Platform platform;
-
-    private static JdbcSchemaManager instance;
-
-    public static Map<String, String> tblNameMap = new HashMap<>();
-
-    private JdbcSchemaManager() {
-    }
-
-    private static void registerTableName(String clzName, String tblName) {
-        tblNameMap.put(clzName, tblName);
-    }
-
-    static {
-        registerTableName(StreamingCluster.class.getSimpleName(), "cluster");
-        registerTableName(StreamDefinition.class.getSimpleName(), "stream_schema");
-        registerTableName(Kafka2TupleMetadata.class.getSimpleName(), "datasource");
-        registerTableName(PolicyDefinition.class.getSimpleName(), "policy");
-        registerTableName(Publishment.class.getSimpleName(), "publishment");
-        registerTableName(PublishmentType.class.getSimpleName(), "publishment_type");
-        registerTableName(ScheduleState.class.getSimpleName(), "schedule_state");
-        registerTableName(PolicyAssignment.class.getSimpleName(), "assignment");
-        registerTableName(Topology.class.getSimpleName(), "topology");
-        registerTableName(AlertPublishEvent.class.getSimpleName(), "alert_event");
-    }
-
-    public static JdbcSchemaManager getInstance() {
-        if (instance == null) {
-            instance = new JdbcSchemaManager();
-        }
-        return instance;
-    }
-
-    public void init(Config config) {
-        Connection connection = null;
-        try {
-            this.platform = PlatformFactory.createNewPlatformInstance("mysql");
-
-            connection = MetadataUtils.getJdbcConnection(config);
-            String dbName = config.getString(MetadataUtils.JDBC_DATABASE_PATH);
-            this.database = platform.readModelFromDatabase(connection, dbName);
-            LOG.info("Loaded " + database);
-
-            Database _database = identifyNewTables();
-            if (_database.getTableCount() > 0) {
-                LOG.info("Creating {} new tables (totally {} tables)", _database.getTableCount(), database.getTableCount());
-                this.platform.createTables(connection, _database, false, true);
-                LOG.info("Created {} new tables: ", _database.getTableCount(), _database.getTables());
-            } else {
-                LOG.debug("All the {} tables have already been created, no new tables", database.getTableCount());
-            }
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-            throw new IllegalStateException(e);
-        } finally {
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (SQLException e) {
-                    LOG.warn(e.getMessage(), e);
-                }
-            }
-        }
-    }
-
-    private Database identifyNewTables() {
-        Database _database = new Database();
-        _database.setName(database.getName());
-        Collection<String> tableNames = tblNameMap.values();
-        LOG.info("Initializing database and creating tables");
-        for (String tableName : tableNames) {
-            if (database.findTable(tableName) == null) {
-                Table table = createTable(tableName);
-                LOG.info("Creating {}", table.toVerboseString());
-                _database.addTable(table);
-                database.addTable(table);
-            } else {
-                LOG.debug("Table {} already exists", tableName);
-            }
-        }
-        return _database;
-    }
-
-    public void shutdown() {
-        this.platform.shutdownDatabase();
-    }
-
-    private Table createTable(String tableName) {
-        Table table = new Table();
-        table.setName(tableName);
-        buildTable(table);
-        return table;
-    }
-
-    private void buildTable(Table table) {
-        Column id = new Column();
-        id.setName("id");
-        id.setPrimaryKey(true);
-        id.setRequired(true);
-        id.setTypeCode(Types.VARCHAR);
-        id.setSize("50");
-        table.addColumn(id);
-
-        Column value = new Column();
-        value.setName("value");
-        value.setTypeCode(Types.CLOB);
-        table.addColumn(value);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java
deleted file mode 100644
index 5082e50..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metadata.impl;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Constructor;
-
-/**
- * @since Apr 12, 2016.
- */
-public class MetadataDaoFactory {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MetadataDaoFactory.class);
-
-    private static final MetadataDaoFactory INSTANCE = new MetadataDaoFactory();
-
-    private IMetadataDao dao;
-
-    private MetadataDaoFactory() {
-        Config config = ConfigFactory.load();
-        if (!config.hasPath(MetadataUtils.META_DATA)) {
-            LOG.warn("metadata is not configured, use in-memory store !!!");
-            dao = new InMemMetadataDaoImpl(null);
-        } else {
-            Config metaDataConfig = config.getConfig(MetadataUtils.META_DATA);
-            try {
-                String clsName = metaDataConfig.getString(MetadataUtils.ALERT_META_DATA_DAO);
-                Class<?> clz;
-                clz = Thread.currentThread().getContextClassLoader().loadClass(clsName);
-                if (IMetadataDao.class.isAssignableFrom(clz)) {
-                    Constructor<?> cotr = clz.getConstructor(Config.class);
-                    LOG.info("metadata.alertMetadataDao loaded: " + clsName);
-                    dao = (IMetadataDao) cotr.newInstance(metaDataConfig);
-                } else {
-                    throw new Exception("metadata.metadataDao configuration need to be implementation of IMetadataDao! ");
-                }
-            } catch (Exception e) {
-                LOG.error("error when initialize the dao, fall back to in memory mode!", e);
-                dao = new InMemMetadataDaoImpl(metaDataConfig);
-            }
-        }
-    }
-
-    public static MetadataDaoFactory getInstance() {
-        return INSTANCE;
-    }
-
-    public IMetadataDao getMetadataDao() {
-        return dao;
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
deleted file mode 100644
index 2325f90..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
+++ /dev/null
@@ -1,753 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metadata.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.coordination.model.VersionedPolicyDefinition;
-import org.apache.eagle.alert.coordination.model.VersionedStreamDefinition;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.ScheduleStateBase;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.apache.eagle.alert.metadata.resource.Models;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-import org.bson.BsonDocument;
-import org.bson.BsonInt32;
-import org.bson.BsonString;
-import org.bson.Document;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.inject.Inject;
-import com.mongodb.Block;
-import com.mongodb.Function;
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.MongoIterable;
-import com.mongodb.client.model.CreateCollectionOptions;
-import com.mongodb.client.model.IndexOptions;
-import com.mongodb.client.model.UpdateOptions;
-import com.mongodb.client.result.DeleteResult;
-import com.mongodb.client.result.UpdateResult;
-import com.typesafe.config.Config;
-
-/**
- * @since Apr 11, 2016.
- */
-public class MongoMetadataDaoImpl implements IMetadataDao {
-
-    private static final String DEFAULT_DB_NAME = "ump_alert_metadata";
-    private static final Logger LOG = LoggerFactory.getLogger(MongoMetadataDaoImpl.class);
-    private static final ObjectMapper mapper = new ObjectMapper();
-    private static final int DEFAULT_CAPPED_MAX_SIZE = 500 * 1024 * 1024;
-    private static final int DEFAULT_CAPPED_MAX_DOCUMENTS = 20000;
-    private static final String MONGO_CAPPED_MAX_SIZE = "mongo.cappedMaxSize";
-    private static final String MONGO_CAPPED_MAX_DOCUMENTS = "mongo.cappedMaxDocuments";
-
-    static {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-    }
-
-    private final String connection;
-    private final String dbname;
-    private final MongoClient client;
-    private final int cappedMaxSize;
-    private final int cappedMaxDocuments;
-
-    private MongoDatabase db;
-    private MongoCollection<Document> cluster;
-    private MongoCollection<Document> schema;
-    private MongoCollection<Document> datasource;
-    private MongoCollection<Document> policy;
-    private MongoCollection<Document> publishment;
-    private MongoCollection<Document> publishmentType;
-    private MongoCollection<Document> topologies;
-    private MongoCollection<Document> alerts;
-
-    // scheduleStates splits to several collections
-    private MongoCollection<Document> scheduleStates;
-    private MongoCollection<Document> spoutSpecs;
-    private MongoCollection<Document> alertSpecs;
-    private MongoCollection<Document> groupSpecs;
-    private MongoCollection<Document> publishSpecs;
-    private MongoCollection<Document> policySnapshots;
-    private MongoCollection<Document> streamSnapshots;
-    private MongoCollection<Document> monitoredStreams;
-    private MongoCollection<Document> assignments;
-
-    @Inject
-    public MongoMetadataDaoImpl(Config config) {
-        this.connection = config.getString(MetadataUtils.MONGO_CONNECTION_PATH);
-        this.cappedMaxSize = config.hasPath(MONGO_CAPPED_MAX_SIZE) ? config.getInt(MONGO_CAPPED_MAX_SIZE) : DEFAULT_CAPPED_MAX_SIZE;
-        this.cappedMaxDocuments = config.hasPath(MONGO_CAPPED_MAX_DOCUMENTS) ? config.getInt(MONGO_CAPPED_MAX_DOCUMENTS) : DEFAULT_CAPPED_MAX_DOCUMENTS;
-        this.client = new MongoClient(new MongoClientURI(this.connection));
-        this.dbname = config.hasPath(MetadataUtils.MONGO_DATABASE) ? config.getString(MetadataUtils.MONGO_DATABASE) : DEFAULT_DB_NAME;
-        init();
-    }
-
-    private boolean isCollectionExists(String collectionName) {
-        boolean result = false;
-        MongoIterable<String> allCollections = db.listCollectionNames();
-        for ( String collection : allCollections ) {
-            if (collection.equals(collectionName)) {
-                result = true;
-                break;
-            }
-        }
-
-        return result;
-    }
-
-    private MongoCollection<Document> getCollection(String collectionName) {
-        // first check if collection exists, if not then create a new collection with cappedSize
-        if (!isCollectionExists(collectionName)) {
-            CreateCollectionOptions option = new CreateCollectionOptions();
-            option.capped(true);
-            option.maxDocuments(cappedMaxDocuments);
-            option.sizeInBytes(cappedMaxSize);
-            db.createCollection(collectionName, option);
-        }
-
-        return db.getCollection(collectionName);
-
-    }
-
-    private void init() {
-        db = client.getDatabase(this.dbname);
-        IndexOptions io = new IndexOptions().background(true).name("nameIndex");
-        BsonDocument doc = new BsonDocument();
-        doc.append("name", new BsonInt32(1));
-        cluster = db.getCollection("clusters");
-        cluster.createIndex(doc, io);
-        {
-            BsonDocument doc2 = new BsonDocument();
-            doc2.append("streamId", new BsonInt32(1));
-            schema = db.getCollection("schemas");
-            schema.createIndex(doc2, io);
-        }
-        datasource = db.getCollection("datasources");
-        datasource.createIndex(doc, io);
-        policy = db.getCollection("policies");
-        policy.createIndex(doc, io);
-        publishment = db.getCollection("publishments");
-        publishment.createIndex(doc, io);
-        topologies = db.getCollection("topologies");
-        topologies.createIndex(doc, io);
-        publishmentType = db.getCollection("publishmentTypes");
-        publishmentType.createIndex(doc, io);
-
-        alerts = db.getCollection("alerts");
-        {
-            IndexOptions io1 = new IndexOptions().background(true).unique(true).name("alertIndex");
-            BsonDocument doc1 = new BsonDocument();
-            doc1.append("alertId", new BsonInt32(1));
-            alerts.createIndex(doc1, io1);
-        }
-
-
-        // below is for schedule_specs and its splitted collections
-        BsonDocument doc1 = new BsonDocument();
-        IndexOptions io1 = new IndexOptions().background(true).name("versionIndex");
-        doc1.append("version", new BsonInt32(1));
-        scheduleStates = getCollection("schedule_specs");
-        scheduleStates.createIndex(doc1, io1);
-
-        spoutSpecs = getCollection("spoutSpecs");
-        {
-            IndexOptions ioInternal = new IndexOptions().background(true).name("topologyIdIndex");
-            BsonDocument docInternal = new BsonDocument();
-            docInternal.append("topologyId", new BsonInt32(1));
-            spoutSpecs.createIndex(docInternal, ioInternal);
-        }
-
-        alertSpecs = getCollection("alertSpecs");
-        {
-            IndexOptions ioInternal = new IndexOptions().background(true).name("topologyNameIndex");
-            BsonDocument docInternal = new BsonDocument();
-            docInternal.append("topologyName", new BsonInt32(1));
-            alertSpecs.createIndex(docInternal, ioInternal);
-        }
-
-        groupSpecs = getCollection("groupSpecs");
-        groupSpecs.createIndex(doc1, io1);
-
-        publishSpecs = getCollection("publishSpecs");
-        publishSpecs.createIndex(doc1, io1);
-
-        policySnapshots = getCollection("policySnapshots");
-        policySnapshots.createIndex(doc1, io);
-
-        streamSnapshots = getCollection("streamSnapshots");
-        streamSnapshots.createIndex(doc1, io);
-
-        monitoredStreams = getCollection("monitoredStreams");
-        monitoredStreams.createIndex(doc1, io);
-
-        assignments = getCollection("assignments");
-        assignments.createIndex(doc1, io1);
-    }
-
-    @Override
-    public List<StreamingCluster> listClusters() {
-        return list(cluster, StreamingCluster.class);
-    }
-
-
-    private <T> OpResult addOrReplace(MongoCollection<Document> collection, T t) {
-        BsonDocument filter = new BsonDocument();
-        if (t instanceof StreamDefinition) {
-            filter.append("streamId", new BsonString(MetadataUtils.getKey(t)));
-        } else if (t instanceof AlertPublishEvent) {
-            filter.append("alertId", new BsonString(MetadataUtils.getKey(t)));
-        } else {
-            filter.append("name", new BsonString(MetadataUtils.getKey(t)));
-        }
-
-        String json = "";
-        OpResult result = new OpResult();
-        try {
-            json = mapper.writeValueAsString(t);
-            UpdateOptions options = new UpdateOptions();
-            options.upsert(true);
-            UpdateResult ur = collection.replaceOne(filter, Document.parse(json), options);
-            // FIXME: could based on matched count do better matching...
-            if (ur.getModifiedCount() > 0 || ur.getUpsertedId() != null) {
-                result.code = 200;
-                result.message = String.format("update %d configuration item.", ur.getModifiedCount());
-            } else {
-                result.code = 500;
-                result.message = "no configuration item create/updated.";
-            }
-        } catch (Exception e) {
-            result.code = 500;
-            result.message = e.getMessage();
-            LOG.error("", e);
-        }
-        return result;
-    }
-
-    private <T> OpResult remove(MongoCollection<Document> collection, String name) {
-        return removeObject(collection, "name", name);
-    }
-
-    private <T> OpResult removeObject(MongoCollection<Document> collection, String nameField, String name) {
-        BsonDocument filter = new BsonDocument();
-        filter.append(nameField, new BsonString(name));
-        DeleteResult dr = collection.deleteOne(filter);
-        OpResult result = new OpResult();
-        result.code = 200;
-        result.message = String.format(" %d config item removed!", dr.getDeletedCount());
-        return result;
-    }
-
-    @Override
-    public OpResult addCluster(StreamingCluster cluster) {
-        return addOrReplace(this.cluster, cluster);
-    }
-
-    @Override
-    public OpResult removeCluster(String clusterId) {
-        return remove(cluster, clusterId);
-    }
-
-    @Override
-    public List<StreamDefinition> listStreams() {
-        return list(schema, StreamDefinition.class);
-    }
-
-    @Override
-    public OpResult createStream(StreamDefinition stream) {
-        return addOrReplace(this.schema, stream);
-    }
-
-    @Override
-    public OpResult removeStream(String streamId) {
-        return removeObject(schema, "streamId", streamId);
-    }
-
-    @Override
-    public List<Kafka2TupleMetadata> listDataSources() {
-        return list(datasource, Kafka2TupleMetadata.class);
-    }
-
-    @Override
-    public OpResult addDataSource(Kafka2TupleMetadata dataSource) {
-        return addOrReplace(this.datasource, dataSource);
-    }
-
-    @Override
-    public OpResult removeDataSource(String datasourceId) {
-        return remove(datasource, datasourceId);
-    }
-
-    @Override
-    public List<PolicyDefinition> listPolicies() {
-        return list(policy, PolicyDefinition.class);
-    }
-
-    @Override
-    public OpResult addPolicy(PolicyDefinition policy) {
-        return addOrReplace(this.policy, policy);
-    }
-
-    @Override
-    public OpResult removePolicy(String policyId) {
-        return remove(policy, policyId);
-    }
-
-    @Override
-    public List<Publishment> listPublishment() {
-        return list(publishment, Publishment.class);
-    }
-
-    @Override
-    public OpResult addPublishment(Publishment publishment) {
-        return addOrReplace(this.publishment, publishment);
-    }
-
-    @Override
-    public OpResult removePublishment(String pubId) {
-        return remove(publishment, pubId);
-    }
-
-    @Override
-    public List<PublishmentType> listPublishmentType() {
-        return list(publishmentType, PublishmentType.class);
-    }
-
-    @Override
-    public OpResult addPublishmentType(PublishmentType pubType) {
-        return addOrReplace(this.publishmentType, pubType);
-    }
-
-    @Override
-    public OpResult removePublishmentType(String pubType) {
-        return remove(publishmentType, pubType);
-    }
-
-    @Override
-    public List<AlertPublishEvent> listAlertPublishEvent(int size) {
-        List<AlertPublishEvent> result = list(alerts, AlertPublishEvent.class);
-        if (size < 0 || size > result.size()) {
-            size = result.size();
-        }
-        return result.subList(result.size() - size, result.size());
-    }
-
-    @Override
-    public AlertPublishEvent getAlertPublishEvent(String alertId) {
-        List<AlertPublishEvent> results = list(alerts, AlertPublishEvent.class);
-        Optional<AlertPublishEvent> op = results.stream().filter(alert -> alert.getAlertId().equals(alertId)).findAny();
-        if (op.isPresent()) {
-            return op.get();
-        }
-        return null;
-    }
-
-    @Override
-    public List<AlertPublishEvent> getAlertPublishEventsByPolicyId(String policyId, int size) {
-        List<AlertPublishEvent> events = list(alerts, AlertPublishEvent.class);
-        List<AlertPublishEvent> result = events.stream().filter(alert -> alert.getPolicyId().equals(policyId)).collect(Collectors.toList());
-        if (size < 0 || size > result.size()) {
-            size = result.size();
-        }
-        return events.subList(result.size() - size, result.size());
-    }
-
-    @Override
-    public OpResult addAlertPublishEvent(AlertPublishEvent event) {
-        return addOrReplace(alerts, event);
-    }
-
-    private <T> OpResult addOne(MongoCollection<Document> collection, T t) {
-        OpResult result = new OpResult();
-        String json = "";
-        try {
-            json = mapper.writeValueAsString(t);
-            collection.insertOne(Document.parse(json));
-            result.code = 200;
-            result.message = String.format("add one document [%s] to collection [%s] succeed!", json, collection.getNamespace());
-            LOG.info(result.message);
-        } catch (Exception e) {
-            result.code = 400;
-            result.message = e.getMessage();
-            LOG.error(String.format("Add one document [%s] to collection [%s] failed!", json, collection.getNamespace()), e);
-        }
-        return result;
-    }
-
-    /**
-     * Due to some field name in SpoutSpec contains dot(.) which is invalid Mongo Field name, we need to transform the
-     * format to store in Mongo.
-     * @return opresult
-     */
-    private <T> OpResult addOneSpoutSpec(T t) {
-        OpResult result = new OpResult();
-        String json = "";
-        try {
-            json = mapper.writeValueAsString(t);
-            Document doc = Document.parse(json);
-
-            String [] metadataMapArrays = {"kafka2TupleMetadataMap", "tuple2StreamMetadataMap", "streamRepartitionMetadataMap"};
-            for (String metadataMapName: metadataMapArrays) {
-                Document _metadataMapDoc = (Document) doc.get(metadataMapName);
-                doc.remove(metadataMapName);
-
-                ArrayList<Document> _metadataMapArray = new ArrayList<>();
-
-                for ( String key : _metadataMapDoc.keySet()) {
-                    Document _subDoc = new Document();
-                    _subDoc.put("topicName", key);
-                    _subDoc.put(metadataMapName, _metadataMapDoc.get(key));
-                    _metadataMapArray.add(_subDoc);
-                }
-                doc.append(metadataMapName, _metadataMapArray);
-            }
-
-            spoutSpecs.insertOne(doc);
-            result.code = 200;
-            result.message = String.format("add one document [%s] to collection [%s] succeed!", doc.toJson(), spoutSpecs.getNamespace());
-            LOG.info(result.message);
-        } catch (Exception e) {
-            result.code = 400;
-            result.message = e.getMessage();
-            LOG.error(String.format("Add one document [%s] to collection [%s] failed!", json, spoutSpecs.getNamespace()), e);
-        }
-        return result;
-    }
-
-    @Override
-    public ScheduleState getScheduleState(String versionId) {
-        BsonDocument doc = new BsonDocument();
-        doc.append("version", new BsonString(versionId));
-        ScheduleState state = scheduleStates.find(doc).map(new Function<Document, ScheduleState>() {
-            @Override
-            public ScheduleState apply(Document t) {
-                String json = t.toJson();
-                try {
-                    return mapper.readValue(json, ScheduleState.class);
-                } catch (IOException e) {
-                    LOG.error("deserialize config item failed!", e);
-                }
-                return null;
-            }
-        }).first();
-
-        if (state != null) {
-            // based on version, to add content from collections of spoutSpecs/alertSpecs/etc..
-            state = addDetailForScheduleState(state, versionId);
-        }
-
-        return state;
-    }
-
-    /**
-     * get the basic ScheduleState, and then based on the version to get all sub-part(spoutSpecs/alertSpecs/etc)
-     * to form a completed ScheduleState.
-     * @return the latest ScheduleState
-     */
-    @Override
-    public ScheduleState getScheduleState() {
-        BsonDocument sort = new BsonDocument();
-        sort.append("generateTime", new BsonInt32(-1));
-        ScheduleState state = scheduleStates.find().sort(sort).map(new Function<Document, ScheduleState>() {
-            @Override
-            public ScheduleState apply(Document t) {
-                String json = t.toJson();
-                try {
-                    return mapper.readValue(json, ScheduleState.class);
-                } catch (IOException e) {
-                    LOG.error("deserialize config item failed!", e);
-                }
-                return null;
-            }
-        }).first();
-
-        if (state != null) {
-            String version = state.getVersion();
-            // based on version, to add content from collections of spoutSpecs/alertSpecs/etc..
-            state = addDetailForScheduleState(state, version);
-        }
-
-        return state;
-    }
-
-    @Override
-    public List<ScheduleState> listScheduleStates() {
-        throw new UnsupportedOperationException("listScheduleStates not support!");
-    }
-
-    @Override
-    public OpResult clearScheduleState(int maxCapacity) {
-        throw new UnsupportedOperationException("clearScheduleState not support!");
-    }
-
-    private ScheduleState addDetailForScheduleState(ScheduleState state, String version) {
-        Map<String, SpoutSpec> spoutMaps = maps(spoutSpecs, SpoutSpec.class, version);
-        if (spoutMaps.size() != 0) {
-            state.setSpoutSpecs(spoutMaps);
-        }
-
-        Map<String, AlertBoltSpec> alertMaps = maps(alertSpecs, AlertBoltSpec.class, version);
-        if (alertMaps.size() != 0) {
-            state.setAlertSpecs(alertMaps);
-        }
-
-        Map<String, RouterSpec> groupMaps = maps(groupSpecs, RouterSpec.class, version);
-        if (groupMaps.size() != 0) {
-            state.setGroupSpecs(groupMaps);
-        }
-
-        Map<String, PublishSpec> publishMaps = maps(publishSpecs, PublishSpec.class, version);
-        if (publishMaps.size() != 0) {
-            state.setPublishSpecs(publishMaps);
-        }
-
-        List<VersionedPolicyDefinition> policyLists = list(policySnapshots, VersionedPolicyDefinition.class, version);
-        if (policyLists.size() != 0) {
-            state.setPolicySnapshots(policyLists);
-        }
-
-        List<VersionedStreamDefinition> streamLists = list(streamSnapshots, VersionedStreamDefinition.class, version);
-        if (streamLists.size() != 0) {
-            state.setStreamSnapshots(streamLists);
-        }
-
-        List<MonitoredStream> monitorLists = list(monitoredStreams, MonitoredStream.class, version);
-        if (monitorLists.size() != 0) {
-            state.setMonitoredStreams(monitorLists);
-        }
-
-        List<PolicyAssignment> assignmentLists = list(assignments, PolicyAssignment.class, version);
-        if (assignmentLists.size() != 0) {
-            state.setAssignments(assignmentLists);
-        }
-        return state;
-    }
-
-    private <T> Map<String, T> maps(MongoCollection<Document> collection, Class<T> clz, String version) {
-        BsonDocument doc = new BsonDocument();
-        doc.append("version", new BsonString(version));
-
-        Map<String, T> maps = new HashMap<String, T>();
-        String mapKey = (clz == SpoutSpec.class) ? "topologyId" : "topologyName";
-        collection.find(doc).forEach(new Block<Document>() {
-            @Override
-            public void apply(Document document) {
-                String json = document.toJson();
-                try {
-                    //Due to some field name in SpoutSpec contains dot(.) which is invalid Mongo Field name,
-                    // we need to transform the format while reading from Mongo.
-                    if (clz == SpoutSpec.class) {
-                        Document doc = Document.parse(json);
-                        String [] metadataMapArrays = {"kafka2TupleMetadataMap", "tuple2StreamMetadataMap", "streamRepartitionMetadataMap"};
-                        for (String metadataMapName: metadataMapArrays) {
-                            ArrayList<Document> subDocs = (ArrayList) doc.get(metadataMapName);
-                            doc.remove(metadataMapName);
-
-                            Document replaceDoc = new Document();
-                            for ( Document subDoc : subDocs) {
-                                replaceDoc.put((String) subDoc.get("topicName"), subDoc.get(metadataMapName));
-                            }
-                            doc.put(metadataMapName, replaceDoc);
-                        }
-
-                        json = doc.toJson();
-                    }
-                    T t = mapper.readValue(json, clz);
-                    maps.put(document.getString(mapKey), t);
-                } catch (IOException e) {
-                    LOG.error("deserialize config item failed!", e);
-                }
-            }
-        });
-
-        return maps;
-    }
-
-    private <T> List<T> list(MongoCollection<Document> collection, Class<T> clz, String version) {
-        BsonDocument doc = new BsonDocument();
-        doc.append("version", new BsonString(version));
-
-        List<T> result = new LinkedList<T>();
-        collection.find(doc).map(new Function<Document, T>() {
-            @Override
-            public T apply(Document t) {
-                String json = t.toJson();
-                try {
-                    return mapper.readValue(json, clz);
-                } catch (IOException e) {
-                    LOG.error("deserialize config item failed!", e);
-                }
-                return null;
-            }
-        }).into(result);
-        return result;
-    }
-
-    private <T> List<T> list(MongoCollection<Document> collection, Class<T> clz) {
-        List<T> result = new LinkedList<T>();
-        collection.find().map(new Function<Document, T>() {
-            @Override
-            public T apply(Document t) {
-                String json = t.toJson();
-                try {
-                    return mapper.readValue(json, clz);
-                } catch (IOException e) {
-                    LOG.error("deserialize config item failed!", e);
-                }
-                return null;
-            }
-        }).into(result);
-        return result;
-    }
-
-    /**
-     * write ScheduleState to several collections. basic info writes to ScheduleState, other writes to collections
-     * named by spoutSpecs/alertSpecs/etc.
-     *
-     * @param state
-     * @return
-     */
-    @Override
-    public OpResult addScheduleState(ScheduleState state) {
-        OpResult result = new OpResult();
-        try {
-            for (String key : state.getSpoutSpecs().keySet()) {
-                SpoutSpec spoutSpec = state.getSpoutSpecs().get(key);
-                addOneSpoutSpec(spoutSpec);
-            }
-
-            for (String key : state.getAlertSpecs().keySet()) {
-                AlertBoltSpec alertBoltSpec = state.getAlertSpecs().get(key);
-                addOne(alertSpecs, alertBoltSpec);
-            }
-
-            for (String key : state.getGroupSpecs().keySet()) {
-                RouterSpec groupSpec = state.getGroupSpecs().get(key);
-                addOne(groupSpecs, groupSpec);
-            }
-
-            for (String key : state.getPublishSpecs().keySet()) {
-                PublishSpec publishSpec = state.getPublishSpecs().get(key);
-                addOne(publishSpecs, publishSpec);
-            }
-
-            for (VersionedPolicyDefinition policySnapshot : state.getPolicySnapshots()) {
-                addOne(policySnapshots, policySnapshot);
-            }
-
-            for (VersionedStreamDefinition streamSnapshot : state.getStreamSnapshots()) {
-                addOne(streamSnapshots, streamSnapshot);
-            }
-
-            for (MonitoredStream monitoredStream : state.getMonitoredStreams()) {
-                addOne(monitoredStreams, monitoredStream);
-            }
-
-            for (PolicyAssignment assignment : state.getAssignments()) {
-                addOne(assignments, assignment);
-            }
-
-            ScheduleStateBase stateBase = new ScheduleStateBase(
-                    state.getVersion(), state.getGenerateTime(), state.getCode(),
-                    state.getMessage(), state.getScheduleTimeMillis());
-
-            addOne(scheduleStates, stateBase);
-
-            result.code = 200;
-            result.message = "add document to collection schedule_specs succeed";
-        } catch (Exception e) {
-            result.code = 400;
-            result.message = e.getMessage();
-            LOG.error("", e);
-        }
-        return result;
-    }
-
-    @Override
-    public List<PolicyAssignment> listAssignments() {
-        return list(assignments, PolicyAssignment.class);
-    }
-
-    @Override
-    public OpResult addAssignment(PolicyAssignment assignment) {
-        return addOne(assignments, assignment);
-    }
-
-    @Override
-    public List<Topology> listTopologies() {
-        return list(topologies, Topology.class);
-    }
-
-    @Override
-    public OpResult addTopology(Topology t) {
-        return addOrReplace(this.topologies, t);
-    }
-
-    @Override
-    public OpResult removeTopology(String topologyName) {
-        return remove(topologies, topologyName);
-    }
-
-    @Override
-    public OpResult clear() {
-        throw new UnsupportedOperationException("clear not support!");
-    }
-
-    @Override
-    public Models export() {
-        throw new UnsupportedOperationException("export not support!");
-    }
-
-    @Override
-    public OpResult importModels(Models models) {
-        throw new UnsupportedOperationException("importModels not support!");
-    }
-
-    @Override
-    public void close() throws IOException {
-        client.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java
deleted file mode 100644
index 2463e5b..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metadata.resource;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-/**
- * This models used for metadata export/import to easy of test.
- *
- * @since May 23, 2016
- */
-public class Models {
-    public List<StreamingCluster> clusters = new ArrayList<StreamingCluster>();
-    public List<StreamDefinition> schemas = new ArrayList<StreamDefinition>();
-    public List<Kafka2TupleMetadata> datasources = new ArrayList<Kafka2TupleMetadata>();
-    public List<PolicyDefinition> policies = new ArrayList<PolicyDefinition>();
-    public List<Publishment> publishments = new ArrayList<Publishment>();
-    public SortedMap<String, ScheduleState> scheduleStates = new TreeMap<String, ScheduleState>();
-    public List<PolicyAssignment> assignments = new ArrayList<PolicyAssignment>();
-    public List<Topology> topologies = new ArrayList<Topology>();
-}