You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/11/29 07:14:11 UTC

incubator-eagle git commit: add ScheduleStateCleaner.java

Repository: incubator-eagle
Updated Branches:
  refs/heads/master c69f94eff -> 021c2bddd


add ScheduleStateCleaner.java

https://issues.apache.org/jira/browse/EAGLE-803

Author: Zhao, Qingwen <qi...@apache.org>

Closes #694 from qingwen220/EAGLE-803.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/021c2bdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/021c2bdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/021c2bdd

Branch: refs/heads/master
Commit: 021c2bddddd98892442f43d0425c67effa192273
Parents: c69f94e
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Tue Nov 29 15:14:04 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Tue Nov 29 15:14:04 2016 +0800

----------------------------------------------------------------------
 .../engine/coordinator/PublishmentType.java     |   9 +-
 .../alert/service/IMetadataServiceClient.java   |   2 +
 .../service/MetadataServiceClientImpl.java      |   7 +
 .../engine/coordinator/PublishmentTypeTest.java |   3 +-
 .../eagle/alert/coordinator/Coordinator.java    |  12 ++
 .../trigger/ScheduleStateCleaner.java           |  53 +++++++
 .../mock/InMemMetadataServiceClient.java        |   5 +
 .../integration/MockMetadataServiceClient.java  |   5 +
 .../metadata/resource/MetadataResource.java     |  10 +-
 .../alert-metadata/pom.xml                      |   4 +
 .../eagle/alert/metadata/IMetadataDao.java      |   7 +-
 .../metadata/impl/InMemMetadataDaoImpl.java     |  11 ++
 .../metadata/impl/JdbcDatabaseHandler.java      | 106 ++++++++++++--
 .../metadata/impl/JdbcMetadataDaoImpl.java      |  29 +++-
 .../alert/metadata/impl/JdbcSchemaManager.java  |   1 +
 .../metadata/impl/MongoMetadataDaoImpl.java     |  11 ++
 .../eagle/alert/metadata/impl/InMemoryTest.java |   4 +-
 .../eagle/alert/metadata/impl/JdbcImplTest.java |  51 +++++--
 .../src/test/resources/application-jdbc.conf    |  25 ++++
 .../src/test/resources/application-mysql.conf   |  23 ----
 .../alert-metadata/src/test/resources/init.sql  |  70 ++++++++++
 eagle-server-assembly/src/main/conf/eagle.conf  | 138 ++++++++++---------
 .../src/main/resources/application.conf         |   2 +
 23 files changed, 462 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
index 5329dfa..2718cfe 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
@@ -21,6 +21,8 @@ package org.apache.eagle.alert.engine.coordinator;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 @JsonIgnoreProperties(ignoreUnknown = true)
@@ -29,7 +31,8 @@ public class PublishmentType {
     private String type;
     private String className;
     private String description;
-    private String fields;
+
+    private List<Map<String, String>> fields;
 
     public String getType() {
         return type;
@@ -55,11 +58,11 @@ public class PublishmentType {
         this.description = description;
     }
 
-    public String getFields() {
+    public List<Map<String, String>> getFields() {
         return fields;
     }
 
-    public void setFields(String fields) {
+    public void setFields(List<Map<String, String>> fields) {
         this.fields = fields;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java
index b00fc78..efa6d0e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java
@@ -86,6 +86,8 @@ public interface IMetadataServiceClient extends Closeable, Serializable {
 
     void clear();
 
+    void clearScheduleState(int maxCapacity);
+
     // for topology mgmt
 
     // for alert event

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
index 209a3a6..8571e56 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
@@ -69,6 +69,7 @@ public class MetadataServiceClientImpl implements IMetadataServiceClient {
     private static final String METADATA_ALERTS_BATCH_PATH = "/metadata/alerts/batch";
 
     private static final String METADATA_CLEAR_PATH = "/metadata/clear";
+    private static final String METADATA_CLEAR_SCHEDULESTATES_PATH = "/metadata/clear/schedulestates";
 
     private static final String EAGLE_CORRELATION_CONTEXT = "metadataService.context";
     public static final String EAGLE_CORRELATION_SERVICE_PORT = "metadataService.port";
@@ -279,6 +280,12 @@ public class MetadataServiceClientImpl implements IMetadataServiceClient {
     }
 
     @Override
+    public void clearScheduleState(int maxCapacity) {
+        WebResource r = client.resource(basePath + METADATA_CLEAR_SCHEDULESTATES_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(maxCapacity);
+    }
+
+    @Override
     public List<AlertPublishEvent> listAlertPublishEvent() {
         return list(METADATA_ALERTS_PATH, new GenericType<List<AlertPublishEvent>>(){});
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
index 91f9cf8..957ac9a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
@@ -20,19 +20,18 @@ import org.junit.Assert;
 import org.junit.Test;
 
 public class PublishmentTypeTest {
+
     @Test
     public void testPublishmentType() {
         PublishmentType publishmentType = new PublishmentType();
         publishmentType.setType("KAFKA");
         publishmentType.setClassName("setClassName");
         publishmentType.setDescription("setDescription");
-        publishmentType.setFields("setFields");
 
         PublishmentType publishmentType1 = new PublishmentType();
         publishmentType1.setType("KAFKA");
         publishmentType1.setClassName("setClassName");
         publishmentType1.setDescription("setDescription");
-        publishmentType1.setFields("setFields");
 
         Assert.assertFalse(publishmentType.equals(new String("")));
         Assert.assertFalse(publishmentType == publishmentType1);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
index 2a0abce..cccf2e3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
@@ -29,6 +29,7 @@ import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
 import org.apache.eagle.alert.coordinator.trigger.CoordinatorTrigger;
 import org.apache.eagle.alert.coordinator.trigger.DynamicPolicyLoader;
 import org.apache.eagle.alert.coordinator.trigger.PolicyChangeListener;
+import org.apache.eagle.alert.coordinator.trigger.ScheduleStateCleaner;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
 import org.apache.eagle.alert.service.MetadataServiceClientImpl;
@@ -73,6 +74,10 @@ public class Coordinator {
     private static final String METADATA_SERVICE_CONTEXT = "metadataService.context";
     private static final String DYNAMIC_POLICY_LOADER_INIT_MILLS = "metadataDynamicCheck.initDelayMillis";
     private static final String DYNAMIC_POLICY_LOADER_DELAY_MILLS = "metadataDynamicCheck.delayMillis";
+    private static final String DYNAMIC_SCHEDULE_STATE_CLEAR_MIN = "metadataDynamicCheck.stateClearPeriodMin";
+    private static final String DYNAMIC_SCHEDULE_STATE_RESERVE_CAPACITY = "metadataDynamicCheck.stateReservedCapacity";
+
+    private static final int DEFAULT_STATE_RESERVE_CAPACITY = 1000;
 
     public static final String GREEDY_SCHEDULER_ZK_PATH = "/alert/greedy/leader";
 
@@ -241,6 +246,13 @@ public class Coordinator {
         loader.addPolicyChangeListener(new PolicyChangeHandler(config, client));
         scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS);
 
+        if (config.hasPath(DYNAMIC_SCHEDULE_STATE_CLEAR_MIN) && config.hasPath(DYNAMIC_SCHEDULE_STATE_RESERVE_CAPACITY)) {
+            int period = config.getInt(DYNAMIC_SCHEDULE_STATE_CLEAR_MIN);
+            int capacity = config.getInt(DYNAMIC_SCHEDULE_STATE_RESERVE_CAPACITY);
+            ScheduleStateCleaner cleaner = new ScheduleStateCleaner(client, capacity);
+            scheduleSrv.scheduleAtFixedRate(cleaner, period, period, TimeUnit.MINUTES);
+        }
+
         Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv)));
         LOG.info("Eagle Coordinator started ...");
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java
new file mode 100644
index 0000000..0229c20
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordinator.trigger;
+
+import com.google.common.base.Stopwatch;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class ScheduleStateCleaner implements Runnable {
+
+    private static Logger LOG = LoggerFactory.getLogger(ScheduleStateCleaner.class);
+
+    private IMetadataServiceClient client;
+    private int reservedCapacity;
+
+    public ScheduleStateCleaner(IMetadataServiceClient client, int capacity) {
+        this.client = client;
+        this.reservedCapacity = capacity;
+    }
+
+    @Override
+    public void run() {
+        // we should catch every exception to avoid zombile thread
+        try {
+            final Stopwatch watch = Stopwatch.createStarted();
+            LOG.info("clear schedule states start.");
+            client.clearScheduleState(reservedCapacity);
+            watch.stop();
+            LOG.info("clear schedule states completed. used time milliseconds: {}", watch.elapsed(TimeUnit.MILLISECONDS));
+            // reset cached policies
+        } catch (Throwable t) {
+            LOG.error("fail to clear schedule states due to {}, but continue to run", t.getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
index ee7ca54..826cde4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
@@ -184,6 +184,11 @@ public class InMemMetadataServiceClient implements IMetadataServiceClient {
     }
 
     @Override
+    public void clearScheduleState(int maxCapacity) {
+
+    }
+
+    @Override
     public List<AlertPublishEvent> listAlertPublishEvent() {
         return this.alerts;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
index 7f650c6..2d3ee85 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
@@ -154,6 +154,11 @@ public class MockMetadataServiceClient implements IMetadataServiceClient {
     }
 
     @Override
+    public void clearScheduleState(int maxCapacity) {
+
+    }
+
+    @Override
     public List<AlertPublishEvent> listAlertPublishEvent() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index fc4a2bd..751853c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -31,6 +31,7 @@ import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
 import org.apache.eagle.alert.metadata.resource.Models;
 import org.apache.eagle.alert.metadata.resource.OpResult;
+
 import com.google.inject.Inject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,6 +74,12 @@ public class MetadataResource {
         return dao.clear();
     }
 
+    @Path("/clear/schedulestates")
+    @POST
+    public OpResult clearScheduleStates(int capacity) {
+        return dao.clearScheduleState(capacity);
+    }
+
     @Path("/export")
     @POST
     public Models export() {
@@ -291,7 +298,7 @@ public class MetadataResource {
                         }
                     }
                 } else {
-                    throw new IllegalArgumentException("Publishsment (name: " + publishmentId + ") not found");
+                    throw new IllegalArgumentException("Publishment (name: " + publishmentId + ") not found");
                 }
             }
 
@@ -535,4 +542,5 @@ public class MetadataResource {
         return results;
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
index ebe24e2..1711f0a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
@@ -44,6 +44,10 @@
             <version>${mongodb.version}</version>
         </dependency>
         <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+        </dependency>
+        <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
             <version>${mysql-connector-java.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
index 19d2b31..c5221c2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
@@ -25,13 +25,12 @@ import org.apache.eagle.alert.engine.coordinator.*;
 import org.apache.eagle.alert.engine.model.AlertPublishEvent;
 import org.apache.eagle.alert.metadata.resource.Models;
 import org.apache.eagle.alert.metadata.resource.OpResult;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 public interface IMetadataDao extends Closeable {
 
@@ -90,8 +89,12 @@ public interface IMetadataDao extends Closeable {
 
     ScheduleState getScheduleState();
 
+    List<ScheduleState> listScheduleStates();
+
     OpResult addScheduleState(ScheduleState state);
 
+    OpResult clearScheduleState(int maxCapacity);
+
     List<PolicyAssignment> listAssignments();
 
     OpResult addAssignment(PolicyAssignment assignment);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
index 611bbb4..adfe15a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
@@ -28,6 +28,7 @@ import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.apache.eagle.alert.metadata.MetadataUtils;
 import org.apache.eagle.alert.metadata.resource.Models;
 import org.apache.eagle.alert.metadata.resource.OpResult;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -267,6 +268,16 @@ public class InMemMetadataDaoImpl implements IMetadataDao {
     }
 
     @Override
+    public List<ScheduleState> listScheduleStates() {
+        throw new UnsupportedOperationException("listScheduleStates not support!");
+    }
+
+    @Override
+    public OpResult clearScheduleState(int maxCapacity) {
+        throw new UnsupportedOperationException("clearScheduleState not support!");
+    }
+
+    @Override
     public List<PolicyAssignment> listAssignments() {
         return assignments;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
index 550eb00..933c02e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
@@ -19,12 +19,20 @@
 package org.apache.eagle.alert.metadata.impl;
 
 import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.model.AlertPublishEvent;
 import org.apache.eagle.alert.metadata.MetadataUtils;
 import org.apache.eagle.alert.metadata.resource.OpResult;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,23 +55,39 @@ public class JdbcDatabaseHandler {
     private static final String QUERY_CONDITION_STATEMENT = "SELECT value FROM %s WHERE id=?";
     private static final String QUERY_ORDERBY_STATEMENT = "SELECT value FROM %s ORDER BY id %s";
     private static final String QUERY_ALL_STATEMENT_WITH_SIZE = "SELECT value FROM %s limit %s";
+    private static final String CLEAR_SCHEDULESTATES_STATEMENT = "DELETE FROM schedule_state WHERE id NOT IN (SELECT id from (SELECT id FROM schedule_state ORDER BY id DESC limit ?) as states)";
 
     public enum SortType { DESC, ASC }
 
-    private Map<String, String> tblNameMap = new HashMap<>();
+    private static Map<String, String> tblNameMap = new HashMap<>();
 
     private static final ObjectMapper mapper = new ObjectMapper();
     private DataSource dataSource;
 
     static {
         mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+        registerTableName(StreamingCluster.class.getSimpleName(), "cluster");
+        registerTableName(StreamDefinition.class.getSimpleName(), "stream_schema");
+        registerTableName(Kafka2TupleMetadata.class.getSimpleName(), "datasource");
+        registerTableName(PolicyDefinition.class.getSimpleName(), "policy");
+        registerTableName(Publishment.class.getSimpleName(), "publishment");
+        registerTableName(PublishmentType.class.getSimpleName(), "publishment_type");
+        registerTableName(ScheduleState.class.getSimpleName(), "schedule_state");
+        registerTableName(PolicyAssignment.class.getSimpleName(), "assignment");
+        registerTableName(Topology.class.getSimpleName(), "topology");
+        registerTableName(AlertPublishEvent.class.getSimpleName(), "alert_event");
+    }
+
+    private static void registerTableName(String clzName, String tblName) {
+        tblNameMap.put(clzName, tblName);
     }
 
     public JdbcDatabaseHandler(Config config) {
         // "jdbc:mysql://dbhost/database?" + "user=sqluser&password=sqluserpw"
-        this.tblNameMap = JdbcSchemaManager.tblNameMap;
+        //this.tblNameMap = JdbcSchemaManager.tblNameMap;
         try {
-            JdbcSchemaManager.getInstance().init(config);
+            //JdbcSchemaManager.getInstance().init(config);
             BasicDataSource bDatasource = new BasicDataSource();
             bDatasource.setDriverClassName(config.getString(MetadataUtils.JDBC_DRIVER_PATH));
             if (config.hasPath(MetadataUtils.JDBC_USERNAME_PATH)) {
@@ -115,7 +139,7 @@ public class JdbcDatabaseHandler {
             connection.commit();
         } catch (SQLException e) {
             LOG.error(e.getMessage(), e.getCause());
-            if (e.getMessage().toLowerCase().contains("duplicate") && connection != null) {
+            if (connection != null) {
                 LOG.info("Detected duplicated entity");
                 try {
                     connection.rollback(savepoint);
@@ -193,15 +217,10 @@ public class JdbcDatabaseHandler {
         return executeSelectStatement(clz, query);
     }
 
-    public <T> T listTop(Class<T> clz, String sortType) {
+    public <T> List<T> listOrderBy(Class<T> clz, String sortType) {
         String tb = getTableName(clz.getSimpleName());
         String query = String.format(QUERY_ORDERBY_STATEMENT, tb, sortType);
-        List<T> result = executeSelectStatement(clz, query);
-        if (result.isEmpty()) {
-            return null;
-        } else {
-            return result.get(0);
-        }
+        return executeSelectStatement(clz, query);
     }
 
     public <T> T listWithFilter(String key, Class<T> clz) {
@@ -287,7 +306,7 @@ public class JdbcDatabaseHandler {
         Connection connection = null;
         try {
             connection = dataSource.getConnection();
-            PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb, key));
+            PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb));
             statement.setString(1, key);
             int status = statement.executeUpdate();
             String msg = String.format("delete %s entities from table %s", status, tb);
@@ -314,4 +333,67 @@ public class JdbcDatabaseHandler {
         //JdbcSchemaManager.getInstance().shutdown();
     }
 
+    public OpResult removeBatch(String clzName, List<String> keys) {
+        String tb = getTableName(clzName);
+        OpResult result = new OpResult();
+        Connection connection = null;
+        try {
+            connection = dataSource.getConnection();
+            connection.setAutoCommit(false);
+            PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb));
+            for (String key : keys) {
+                statement.setString(1, key);
+                statement.addBatch();
+            }
+            int[] num = statement.executeBatch();
+            connection.commit();
+            int sum = 0;
+            for (int i : num) {
+                sum += i;
+            }
+            String msg = String.format("delete %s records from table %s", sum, tb);
+            result.code = OpResult.SUCCESS;
+            result.message = msg;
+            statement.close();
+        } catch (SQLException e) {
+            result.code = OpResult.FAILURE;
+            result.message = e.getMessage();
+            LOG.error(e.getMessage(), e);
+        } finally {
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (SQLException e) {
+                    LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
+                }
+            }
+        }
+        return result;
+    }
+
+    public OpResult removeScheduleStates(int capacity) {
+        OpResult result = new OpResult();
+        Connection connection = null;
+        try {
+            connection = dataSource.getConnection();
+            PreparedStatement statement = connection.prepareStatement(CLEAR_SCHEDULESTATES_STATEMENT);
+            statement.setInt(1, capacity);
+            result.message = String.format("delete %d records from schedule_state", statement.executeUpdate());
+            result.code = OpResult.SUCCESS;
+            statement.close();
+        } catch (SQLException e) {
+            result.code = OpResult.FAILURE;
+            result.message = e.getMessage();
+            LOG.error(e.getMessage(), e);
+        } finally {
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (SQLException e) {
+                    LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
+                }
+            }
+        }
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
index 22435fe..384eddc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
@@ -26,10 +26,14 @@ import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.apache.eagle.alert.metadata.MetadataUtils;
 import org.apache.eagle.alert.metadata.resource.Models;
 import org.apache.eagle.alert.metadata.resource.OpResult;
+
 import com.google.inject.Inject;
 import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -37,6 +41,7 @@ import java.util.stream.Collectors;
  * @since May 26, 2016.
  */
 public class JdbcMetadataDaoImpl implements IMetadataDao {
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcMetadataDaoImpl.class);
     private JdbcDatabaseHandler handler;
 
     @Inject
@@ -101,12 +106,22 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
     @Override
     public ScheduleState getScheduleState(String versionId) {
         return handler.listWithFilter(versionId, ScheduleState.class);
-        //return null;
     }
 
     @Override
     public ScheduleState getScheduleState() {
-        return handler.listTop(ScheduleState.class, JdbcDatabaseHandler.SortType.DESC.toString());
+        List<ScheduleState> scheduleStates =
+                handler.listOrderBy(ScheduleState.class, JdbcDatabaseHandler.SortType.DESC.toString());
+        if (scheduleStates.isEmpty()) {
+            return null;
+        } else {
+            return scheduleStates.get(0);
+        }
+    }
+
+    @Override
+    public List<ScheduleState> listScheduleStates() {
+        return handler.list(ScheduleState.class);
     }
 
     @Override
@@ -160,6 +175,16 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
     }
 
     @Override
+    public OpResult clearScheduleState(int maxCapacity) {
+        if (maxCapacity <= 0) {
+            maxCapacity = 10;
+        }
+        OpResult result = handler.removeScheduleStates(maxCapacity);
+        LOG.info(result.message);
+        return result;
+    }
+
+    @Override
     public OpResult addAssignment(PolicyAssignment assignment) {
         return handler.addOrReplace(PolicyAssignment.class.getSimpleName(), assignment);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java
index 4568726..a02c51e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java
@@ -41,6 +41,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+@Deprecated
 public class JdbcSchemaManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(JdbcSchemaManager.class);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
index af0494e..d639bff 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
@@ -43,6 +43,7 @@ import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.apache.eagle.alert.metadata.MetadataUtils;
 import org.apache.eagle.alert.metadata.resource.Models;
 import org.apache.eagle.alert.metadata.resource.OpResult;
+
 import org.bson.BsonDocument;
 import org.bson.BsonInt32;
 import org.bson.BsonString;
@@ -501,6 +502,16 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
         return state;
     }
 
+    @Override
+    public List<ScheduleState> listScheduleStates() {
+        throw new UnsupportedOperationException("listScheduleStates not support!");
+    }
+
+    @Override
+    public OpResult clearScheduleState(int maxCapacity) {
+        throw new UnsupportedOperationException("clearScheduleState not support!");
+    }
+
     private ScheduleState addDetailForScheduleState(ScheduleState state, String version) {
         Map<String, SpoutSpec> spoutMaps = maps(spoutSpecs, SpoutSpec.class, version);
         if (spoutMaps.size() != 0) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java
index 7655f54..f45fd12 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.eagle.alert.metadata.impl;
 
-import com.google.common.collect.Lists;
 import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
@@ -25,12 +24,11 @@ import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
 import org.apache.eagle.alert.engine.model.AlertPublishEvent;
 import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.apache.eagle.alert.metadata.resource.OpResult;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-
 /**
  * @since May 1, 2016
  */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java
index 7a2fcb5..9d188c4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java
@@ -20,6 +20,7 @@ package org.apache.eagle.alert.metadata.impl;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -29,15 +30,14 @@ import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.coordinator.PublishmentType;
 import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
 import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.MetadataUtils;
+
 import org.apache.eagle.alert.metadata.resource.OpResult;
 import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
 
 public class JdbcImplTest {
     private static Logger LOG = LoggerFactory.getLogger(JdbcImplTest.class);
@@ -45,10 +45,9 @@ public class JdbcImplTest {
 
     @BeforeClass
     public static void setup() {
-        System.setProperty("config.resource", "/application-mysql.conf");
         ConfigFactory.invalidateCaches();
-        Config config = ConfigFactory.load();
-        dao = new JdbcMetadataDaoImpl(config.getConfig(MetadataUtils.META_DATA));
+        Config config = ConfigFactory.load("application-jdbc.conf");
+        dao = new JdbcMetadataDaoImpl(config);
     }
 
     @AfterClass
@@ -64,7 +63,6 @@ public class JdbcImplTest {
 
     private String TOPO_NAME = "topoName";
 
-    @Ignore
     @Test
     public void test_apis() {
         // publishment
@@ -109,6 +107,8 @@ public class JdbcImplTest {
             Assert.assertEquals(200, result.code);
             List<StreamingCluster> assigns = dao.listClusters();
             Assert.assertEquals(1, assigns.size());
+            dao.removeCluster("dd");
+            Assert.assertEquals(0, dao.listClusters().size());
         }
         // data source
         {
@@ -133,10 +133,20 @@ public class JdbcImplTest {
         {
             PublishmentType publishmentType = new PublishmentType();
             publishmentType.setType("KAFKA");
+            List<Map<String, String>> fields = new ArrayList<>();
+            Map<String, String> field1 = new HashMap<>();
+            field1.put("name", "kafka_broker");
+            field1.put("value", "sandbox.hortonworks.com:6667");
+            Map<String, String> field2 = new HashMap<>();
+            field2.put("name", "topic");
+            fields.add(field1);
+            fields.add(field2);
+            publishmentType.setFields(fields);
             OpResult result = dao.addPublishmentType(publishmentType);
             Assert.assertEquals(200, result.code);
-            List<PublishmentType> assigns = dao.listPublishmentType();
-            Assert.assertEquals(1, assigns.size());
+            List<PublishmentType> types = dao.listPublishmentType();
+            Assert.assertEquals(1, types.size());
+            Assert.assertEquals(2, types.get(0).getFields().size());
         }
     }
 
@@ -151,7 +161,6 @@ public class JdbcImplTest {
         Assert.assertEquals(state.getVersion(), versionId);
     }
 
-    @Ignore
     @Test
     public void test_readCurrentState() {
         test_addstate();
@@ -161,4 +170,26 @@ public class JdbcImplTest {
         LOG.debug(state.getVersion());
         LOG.debug(state.getGenerateTime());
     }
+
+    @Test
+    public void test_clearScheduleState() {
+        int maxCapacity = 4;
+        List<String> reservedOnes = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            ScheduleState state = new ScheduleState();
+            String versionId = "state-" + System.currentTimeMillis();
+            state.setVersion(versionId);
+            state.setGenerateTime(String.valueOf(new Date().getTime()));
+            dao.addScheduleState(state);
+            if (i >= 10 - maxCapacity) {
+                reservedOnes.add(versionId);
+            }
+        }
+        dao.clearScheduleState(maxCapacity);
+        List<ScheduleState> scheduleStates = dao.listScheduleStates();
+        Assert.assertTrue(scheduleStates.size() == maxCapacity);
+        List<String> TargetVersions = new ArrayList<>();
+        scheduleStates.stream().forEach(state -> TargetVersions.add(state.getVersion()));
+        Assert.assertTrue(CollectionUtils.isEqualCollection(reservedOnes, TargetVersions));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-jdbc.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-jdbc.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-jdbc.conf
new file mode 100644
index 0000000..9c71a28
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-jdbc.conf
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+metadata {
+  metadataDao = org.apache.eagle.alert.metadata.impl.JdbcMetadataDaoImpl
+  jdbc {
+    username = null
+    password = null
+    driverClassName = "org.h2.Driver"
+    connection = "jdbc:h2:mem:test;INIT=RUNSCRIPT FROM './src/test/resources/init.sql'"
+    connectionProperties = "encoding=UTF8;timeout=60"
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mysql.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mysql.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mysql.conf
deleted file mode 100644
index 2a1aa2c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mysql.conf
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-metadata {
-  metadataDao = org.apache.eagle.alert.metadata.impl.JdbcMetadataDaoImpl
-  jdbc {
-    connection = "jdbc:mysql://localhost:3306/alert_metadata?user=root&password=&createDatabaseIfNotExist=true"
-    database = "alert_metadata"
-    driverClassName = com.mysql.jdbc.Driver
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/init.sql
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/init.sql b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/init.sql
new file mode 100644
index 0000000..90e9515
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/init.sql
@@ -0,0 +1,70 @@
+-- /*
+--  * Licensed to the Apache Software Foundation (ASF) under one or more
+--  * contributor license agreements.  See the NOTICE file distributed with
+--  * this work for additional information regarding copyright ownership.
+--  * The ASF licenses this file to You under the Apache License, Version 2.0
+--  * (the "License"); you may not use this file except in compliance with
+--  * the License.  You may obtain a copy of the License at
+--  *
+--  *    http://www.apache.org/licenses/LICENSE-2.0
+--  *
+--  * Unless required by applicable law or agreed to in writing, software
+--  * distributed under the License is distributed on an "AS IS" BASIS,
+--  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+--  * See the License for the specific language governing permissions and
+--  * limitations under the License.
+--  *
+--  */
+
+CREATE TABLE IF NOT EXISTS cluster (
+  id VARCHAR(50) PRIMARY KEY,
+  value longtext
+);
+
+
+CREATE TABLE IF NOT EXISTS stream_schema (
+  id VARCHAR(50) PRIMARY KEY,
+  value longtext
+);
+
+CREATE TABLE IF NOT EXISTS datasource (
+  id VARCHAR(50) PRIMARY KEY,
+  value longtext
+);
+
+CREATE TABLE IF NOT EXISTS policy (
+  id VARCHAR(50) PRIMARY KEY,
+  value longtext
+);
+
+CREATE TABLE IF NOT EXISTS publishment (
+  id VARCHAR(50) PRIMARY KEY,
+  value longtext
+);
+
+
+CREATE TABLE IF NOT EXISTS publishment_type (
+  id VARCHAR(50) PRIMARY KEY,
+  value longtext
+);
+
+
+CREATE TABLE IF NOT EXISTS schedule_state (
+  id VARCHAR(50) PRIMARY KEY,
+  value longtext
+);
+
+CREATE TABLE IF NOT EXISTS assignment (
+  id VARCHAR(50) PRIMARY KEY,
+  value longtext
+);
+
+CREATE TABLE IF NOT EXISTS topology (
+  id VARCHAR(50) PRIMARY KEY,
+  value longtext
+);
+
+CREATE TABLE IF NOT EXISTS alert_event (
+  id VARCHAR(50) PRIMARY KEY,
+  value longtext
+);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-server-assembly/src/main/conf/eagle.conf
----------------------------------------------------------------------
diff --git a/eagle-server-assembly/src/main/conf/eagle.conf b/eagle-server-assembly/src/main/conf/eagle.conf
index 9c804a6..5f6c240 100644
--- a/eagle-server-assembly/src/main/conf/eagle.conf
+++ b/eagle-server-assembly/src/main/conf/eagle.conf
@@ -17,21 +17,21 @@
 # Eagle REST Web Service Configuration
 # ---------------------------------------------
 service {
-	env = "testing"
-	host = "localhost"
-	port = 9090
-	username = "admin"
-	password = "secret"
-	readTimeOutSeconds = 60
-	context = "/rest"
-	timezone = "UTC"
+  env = "testing"
+  host = "localhost"
+  port = 9090
+  username = "admin"
+  password = "secret"
+  readTimeOutSeconds = 60
+  context = "/rest"
+  timezone = "UTC"
 }
 
 zookeeper {
-	zkQuorum = "localhost:2181"
-	zkSessionTimeoutMs : 15000
-	zkRetryTimes : 3
-	zkRetryInterval : 20000
+  zkQuorum = "localhost:2181"
+  zkSessionTimeoutMs : 15000
+  zkRetryTimes : 3
+  zkRetryInterval : 20000
 }
 
 # ---------------------------------------------
@@ -39,57 +39,57 @@ zookeeper {
 # ---------------------------------------------
 
 storage {
-	# storage type: ["hbase","jdbc"]
-	# default is "hbase"
-	type = "hbase"
+  # storage type: ["hbase","jdbc"]
+  # default is "hbase"
+  type = "hbase"
 
-	hbase {
-		# hbase configuration: hbase.zookeeper.quorum
-		# default is "localhost"
-		zookeeperQuorum = "localhost"
+  hbase {
+    # hbase configuration: hbase.zookeeper.quorum
+    # default is "localhost"
+    zookeeperQuorum = "localhost"
 
-		# hbase configuration: hbase.zookeeper.property.clientPort
-		# default is 2181
-		zookeeperPropertyClientPort = 2181
+    # hbase configuration: hbase.zookeeper.property.clientPort
+    # default is 2181
+    zookeeperPropertyClientPort = 2181
 
-		# hbase configuration: zookeeper.znode.parent
-		# default is "/hbase"
-		zookeeperZnodeParent = "/hbase-unsecure"
+    # hbase configuration: zookeeper.znode.parent
+    # default is "/hbase"
+    zookeeperZnodeParent = "/hbase-unsecure"
 
-		# eagle web login profile: [sandbox, default]
-		# default is sandbox
-		tableNamePrefixedWithEnvironment = false
+    # eagle web login profile: [sandbox, default]
+    # default is sandbox
+    tableNamePrefixedWithEnvironment = false
 
-		# eagle coprocessor enabled or not: [true, false]
-		# default is false
-		coprocessorEnabled = false
-	}
+    # eagle coprocessor enabled or not: [true, false]
+    # default is false
+    coprocessorEnabled = false
+  }
 }
 
 # ---------------------------------------------
 # Eagle Metadata Store Configuration
 # ---------------------------------------------
 metadata {
-	store = org.apache.eagle.metadata.service.memory.MemoryMetadataStore
-	jdbc {
-		username = "root"
-		password = ""
-		driverClassName = com.mysql.jdbc.Driver
-		url = "jdbc:mysql://server.eagle.apache.org:3306/eagle"
-	}
+  store = org.apache.eagle.metadata.service.memory.MemoryMetadataStore
+  jdbc {
+    username = "root"
+    password = ""
+    driverClassName = com.mysql.jdbc.Driver
+    url = "jdbc:mysql://server.eagle.apache.org:3306/eagle"
+  }
 }
 
 # ---------------------------------------------
 # Eagle Application Configuration
 # ---------------------------------------------
 application {
-	sink {
-		type = org.apache.eagle.app.sink.KafkaStreamSink
-	}
-	storm {
-		nimbusHost = "server.eagle.apache.org"
-		nimbusThriftPort = 6627
-	}
+  sink {
+    type = org.apache.eagle.app.sink.KafkaStreamSink
+  }
+  storm {
+    nimbusHost = "server.eagle.apache.org"
+    nimbusThriftPort = 6627
+  }
 }
 
 # ---------------------------------------------
@@ -98,27 +98,29 @@ application {
 
 # Coordinator Configuration
 coordinator {
-	policiesPerBolt = 5
-	boltParallelism = 5
-	policyDefaultParallelism = 5
-	boltLoadUpbound = 0.8
-	topologyLoadUpbound = 0.8
-	numOfAlertBoltsPerTopology = 5
-	zkConfig {
-		zkQuorum = "server.eagle.apache.org:2181"
-		zkRoot = "/alert"
-		zkSessionTimeoutMs = 10000
-		connectionTimeoutMs = 10000
-		zkRetryTimes = 3
-		zkRetryInterval = 3000
-	}
-	metadataService {
-		host = "localhost",
-		port = 9090,
-		context = "/rest"
-	}
-	metadataDynamicCheck {
-		initDelayMillis = 1000
-		delayMillis = 30000
+  policiesPerBolt = 5
+  boltParallelism = 5
+  policyDefaultParallelism = 5
+  boltLoadUpbound = 0.8
+  topologyLoadUpbound = 0.8
+  numOfAlertBoltsPerTopology = 5
+  zkConfig {
+    zkQuorum = "server.eagle.apache.org:2181"
+    zkRoot = "/alert"
+    zkSessionTimeoutMs = 10000
+    connectionTimeoutMs = 10000
+    zkRetryTimes = 3
+    zkRetryInterval = 3000
+  }
+  metadataService {
+    host = "localhost",
+    port = 9090,
+    context = "/rest"
+  }
+  metadataDynamicCheck {
+    initDelayMillis = 1000
+    delayMillis = 30000
+    stateClearPeriodMin = 1440
+    stateReservedCapacity = 100
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-server/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/resources/application.conf b/eagle-server/src/main/resources/application.conf
index 91757d7..ce68550 100644
--- a/eagle-server/src/main/resources/application.conf
+++ b/eagle-server/src/main/resources/application.conf
@@ -139,5 +139,7 @@ coordinator {
   metadataDynamicCheck {
     initDelayMillis = 1000
     delayMillis = 30000
+    stateClearPeriodMin = 1440
+    stateReservedCapacity = 100
   }
 }