You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/11/29 07:14:11 UTC
incubator-eagle git commit: add ScheduleStateCleaner.java
Repository: incubator-eagle
Updated Branches:
refs/heads/master c69f94eff -> 021c2bddd
add ScheduleStateCleaner.java
https://issues.apache.org/jira/browse/EAGLE-803
Author: Zhao, Qingwen <qi...@apache.org>
Closes #694 from qingwen220/EAGLE-803.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/021c2bdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/021c2bdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/021c2bdd
Branch: refs/heads/master
Commit: 021c2bddddd98892442f43d0425c67effa192273
Parents: c69f94e
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Tue Nov 29 15:14:04 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Tue Nov 29 15:14:04 2016 +0800
----------------------------------------------------------------------
.../engine/coordinator/PublishmentType.java | 9 +-
.../alert/service/IMetadataServiceClient.java | 2 +
.../service/MetadataServiceClientImpl.java | 7 +
.../engine/coordinator/PublishmentTypeTest.java | 3 +-
.../eagle/alert/coordinator/Coordinator.java | 12 ++
.../trigger/ScheduleStateCleaner.java | 53 +++++++
.../mock/InMemMetadataServiceClient.java | 5 +
.../integration/MockMetadataServiceClient.java | 5 +
.../metadata/resource/MetadataResource.java | 10 +-
.../alert-metadata/pom.xml | 4 +
.../eagle/alert/metadata/IMetadataDao.java | 7 +-
.../metadata/impl/InMemMetadataDaoImpl.java | 11 ++
.../metadata/impl/JdbcDatabaseHandler.java | 106 ++++++++++++--
.../metadata/impl/JdbcMetadataDaoImpl.java | 29 +++-
.../alert/metadata/impl/JdbcSchemaManager.java | 1 +
.../metadata/impl/MongoMetadataDaoImpl.java | 11 ++
.../eagle/alert/metadata/impl/InMemoryTest.java | 4 +-
.../eagle/alert/metadata/impl/JdbcImplTest.java | 51 +++++--
.../src/test/resources/application-jdbc.conf | 25 ++++
.../src/test/resources/application-mysql.conf | 23 ----
.../alert-metadata/src/test/resources/init.sql | 70 ++++++++++
eagle-server-assembly/src/main/conf/eagle.conf | 138 ++++++++++---------
.../src/main/resources/application.conf | 2 +
23 files changed, 462 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
index 5329dfa..2718cfe 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
@@ -21,6 +21,8 @@ package org.apache.eagle.alert.engine.coordinator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.commons.lang3.builder.HashCodeBuilder;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
@JsonIgnoreProperties(ignoreUnknown = true)
@@ -29,7 +31,8 @@ public class PublishmentType {
private String type;
private String className;
private String description;
- private String fields;
+
+ private List<Map<String, String>> fields;
public String getType() {
return type;
@@ -55,11 +58,11 @@ public class PublishmentType {
this.description = description;
}
- public String getFields() {
+ public List<Map<String, String>> getFields() {
return fields;
}
- public void setFields(String fields) {
+ public void setFields(List<Map<String, String>> fields) {
this.fields = fields;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java
index b00fc78..efa6d0e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java
@@ -86,6 +86,8 @@ public interface IMetadataServiceClient extends Closeable, Serializable {
void clear();
+ void clearScheduleState(int maxCapacity);
+
// for topology mgmt
// for alert event
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
index 209a3a6..8571e56 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
@@ -69,6 +69,7 @@ public class MetadataServiceClientImpl implements IMetadataServiceClient {
private static final String METADATA_ALERTS_BATCH_PATH = "/metadata/alerts/batch";
private static final String METADATA_CLEAR_PATH = "/metadata/clear";
+ private static final String METADATA_CLEAR_SCHEDULESTATES_PATH = "/metadata/clear/schedulestates";
private static final String EAGLE_CORRELATION_CONTEXT = "metadataService.context";
public static final String EAGLE_CORRELATION_SERVICE_PORT = "metadataService.port";
@@ -279,6 +280,12 @@ public class MetadataServiceClientImpl implements IMetadataServiceClient {
}
@Override
+ public void clearScheduleState(int maxCapacity) {
+ WebResource r = client.resource(basePath + METADATA_CLEAR_SCHEDULESTATES_PATH);
+ r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(maxCapacity);
+ }
+
+ @Override
public List<AlertPublishEvent> listAlertPublishEvent() {
return list(METADATA_ALERTS_PATH, new GenericType<List<AlertPublishEvent>>(){});
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
index 91f9cf8..957ac9a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
@@ -20,19 +20,18 @@ import org.junit.Assert;
import org.junit.Test;
public class PublishmentTypeTest {
+
@Test
public void testPublishmentType() {
PublishmentType publishmentType = new PublishmentType();
publishmentType.setType("KAFKA");
publishmentType.setClassName("setClassName");
publishmentType.setDescription("setDescription");
- publishmentType.setFields("setFields");
PublishmentType publishmentType1 = new PublishmentType();
publishmentType1.setType("KAFKA");
publishmentType1.setClassName("setClassName");
publishmentType1.setDescription("setDescription");
- publishmentType1.setFields("setFields");
Assert.assertFalse(publishmentType.equals(new String("")));
Assert.assertFalse(publishmentType == publishmentType1);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
index 2a0abce..cccf2e3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
@@ -29,6 +29,7 @@ import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
import org.apache.eagle.alert.coordinator.trigger.CoordinatorTrigger;
import org.apache.eagle.alert.coordinator.trigger.DynamicPolicyLoader;
import org.apache.eagle.alert.coordinator.trigger.PolicyChangeListener;
+import org.apache.eagle.alert.coordinator.trigger.ScheduleStateCleaner;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.service.IMetadataServiceClient;
import org.apache.eagle.alert.service.MetadataServiceClientImpl;
@@ -73,6 +74,10 @@ public class Coordinator {
private static final String METADATA_SERVICE_CONTEXT = "metadataService.context";
private static final String DYNAMIC_POLICY_LOADER_INIT_MILLS = "metadataDynamicCheck.initDelayMillis";
private static final String DYNAMIC_POLICY_LOADER_DELAY_MILLS = "metadataDynamicCheck.delayMillis";
+ private static final String DYNAMIC_SCHEDULE_STATE_CLEAR_MIN = "metadataDynamicCheck.stateClearPeriodMin";
+ private static final String DYNAMIC_SCHEDULE_STATE_RESERVE_CAPACITY = "metadataDynamicCheck.stateReservedCapacity";
+
+ private static final int DEFAULT_STATE_RESERVE_CAPACITY = 1000;
public static final String GREEDY_SCHEDULER_ZK_PATH = "/alert/greedy/leader";
@@ -241,6 +246,13 @@ public class Coordinator {
loader.addPolicyChangeListener(new PolicyChangeHandler(config, client));
scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS);
+ if (config.hasPath(DYNAMIC_SCHEDULE_STATE_CLEAR_MIN) && config.hasPath(DYNAMIC_SCHEDULE_STATE_RESERVE_CAPACITY)) {
+ int period = config.getInt(DYNAMIC_SCHEDULE_STATE_CLEAR_MIN);
+ int capacity = config.getInt(DYNAMIC_SCHEDULE_STATE_RESERVE_CAPACITY);
+ ScheduleStateCleaner cleaner = new ScheduleStateCleaner(client, capacity);
+ scheduleSrv.scheduleAtFixedRate(cleaner, period, period, TimeUnit.MINUTES);
+ }
+
Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv)));
LOG.info("Eagle Coordinator started ...");
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java
new file mode 100644
index 0000000..0229c20
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/ScheduleStateCleaner.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordinator.trigger;
+
+import com.google.common.base.Stopwatch;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class ScheduleStateCleaner implements Runnable {
+
+ private static Logger LOG = LoggerFactory.getLogger(ScheduleStateCleaner.class);
+
+ private IMetadataServiceClient client;
+ private int reservedCapacity;
+
+ public ScheduleStateCleaner(IMetadataServiceClient client, int capacity) {
+ this.client = client;
+ this.reservedCapacity = capacity;
+ }
+
+ @Override
+ public void run() {
+ // we should catch every exception to avoid zombile thread
+ try {
+ final Stopwatch watch = Stopwatch.createStarted();
+ LOG.info("clear schedule states start.");
+ client.clearScheduleState(reservedCapacity);
+ watch.stop();
+ LOG.info("clear schedule states completed. used time milliseconds: {}", watch.elapsed(TimeUnit.MILLISECONDS));
+ // reset cached policies
+ } catch (Throwable t) {
+ LOG.error("fail to clear schedule states due to {}, but continue to run", t.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
index ee7ca54..826cde4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
@@ -184,6 +184,11 @@ public class InMemMetadataServiceClient implements IMetadataServiceClient {
}
@Override
+ public void clearScheduleState(int maxCapacity) {
+
+ }
+
+ @Override
public List<AlertPublishEvent> listAlertPublishEvent() {
return this.alerts;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
index 7f650c6..2d3ee85 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
@@ -154,6 +154,11 @@ public class MockMetadataServiceClient implements IMetadataServiceClient {
}
@Override
+ public void clearScheduleState(int maxCapacity) {
+
+ }
+
+ @Override
public List<AlertPublishEvent> listAlertPublishEvent() {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index fc4a2bd..751853c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -31,6 +31,7 @@ import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
import org.apache.eagle.alert.metadata.resource.Models;
import org.apache.eagle.alert.metadata.resource.OpResult;
+
import com.google.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +74,12 @@ public class MetadataResource {
return dao.clear();
}
+ @Path("/clear/schedulestates")
+ @POST
+ public OpResult clearScheduleStates(int capacity) {
+ return dao.clearScheduleState(capacity);
+ }
+
@Path("/export")
@POST
public Models export() {
@@ -291,7 +298,7 @@ public class MetadataResource {
}
}
} else {
- throw new IllegalArgumentException("Publishsment (name: " + publishmentId + ") not found");
+ throw new IllegalArgumentException("Publishment (name: " + publishmentId + ") not found");
}
}
@@ -535,4 +542,5 @@ public class MetadataResource {
return results;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
index ebe24e2..1711f0a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
@@ -44,6 +44,10 @@
<version>${mongodb.version}</version>
</dependency>
<dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ </dependency>
+ <dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector-java.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
index 19d2b31..c5221c2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
@@ -25,13 +25,12 @@ import org.apache.eagle.alert.engine.coordinator.*;
import org.apache.eagle.alert.engine.model.AlertPublishEvent;
import org.apache.eagle.alert.metadata.resource.Models;
import org.apache.eagle.alert.metadata.resource.OpResult;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
public interface IMetadataDao extends Closeable {
@@ -90,8 +89,12 @@ public interface IMetadataDao extends Closeable {
ScheduleState getScheduleState();
+ List<ScheduleState> listScheduleStates();
+
OpResult addScheduleState(ScheduleState state);
+ OpResult clearScheduleState(int maxCapacity);
+
List<PolicyAssignment> listAssignments();
OpResult addAssignment(PolicyAssignment assignment);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
index 611bbb4..adfe15a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
@@ -28,6 +28,7 @@ import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.alert.metadata.MetadataUtils;
import org.apache.eagle.alert.metadata.resource.Models;
import org.apache.eagle.alert.metadata.resource.OpResult;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -267,6 +268,16 @@ public class InMemMetadataDaoImpl implements IMetadataDao {
}
@Override
+ public List<ScheduleState> listScheduleStates() {
+ throw new UnsupportedOperationException("listScheduleStates not support!");
+ }
+
+ @Override
+ public OpResult clearScheduleState(int maxCapacity) {
+ throw new UnsupportedOperationException("clearScheduleState not support!");
+ }
+
+ @Override
public List<PolicyAssignment> listAssignments() {
return assignments;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
index 550eb00..933c02e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcDatabaseHandler.java
@@ -19,12 +19,20 @@
package org.apache.eagle.alert.metadata.impl;
import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.model.AlertPublishEvent;
import org.apache.eagle.alert.metadata.MetadataUtils;
import org.apache.eagle.alert.metadata.resource.OpResult;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,23 +55,39 @@ public class JdbcDatabaseHandler {
private static final String QUERY_CONDITION_STATEMENT = "SELECT value FROM %s WHERE id=?";
private static final String QUERY_ORDERBY_STATEMENT = "SELECT value FROM %s ORDER BY id %s";
private static final String QUERY_ALL_STATEMENT_WITH_SIZE = "SELECT value FROM %s limit %s";
+ private static final String CLEAR_SCHEDULESTATES_STATEMENT = "DELETE FROM schedule_state WHERE id NOT IN (SELECT id from (SELECT id FROM schedule_state ORDER BY id DESC limit ?) as states)";
public enum SortType { DESC, ASC }
- private Map<String, String> tblNameMap = new HashMap<>();
+ private static Map<String, String> tblNameMap = new HashMap<>();
private static final ObjectMapper mapper = new ObjectMapper();
private DataSource dataSource;
static {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ registerTableName(StreamingCluster.class.getSimpleName(), "cluster");
+ registerTableName(StreamDefinition.class.getSimpleName(), "stream_schema");
+ registerTableName(Kafka2TupleMetadata.class.getSimpleName(), "datasource");
+ registerTableName(PolicyDefinition.class.getSimpleName(), "policy");
+ registerTableName(Publishment.class.getSimpleName(), "publishment");
+ registerTableName(PublishmentType.class.getSimpleName(), "publishment_type");
+ registerTableName(ScheduleState.class.getSimpleName(), "schedule_state");
+ registerTableName(PolicyAssignment.class.getSimpleName(), "assignment");
+ registerTableName(Topology.class.getSimpleName(), "topology");
+ registerTableName(AlertPublishEvent.class.getSimpleName(), "alert_event");
+ }
+
+ private static void registerTableName(String clzName, String tblName) {
+ tblNameMap.put(clzName, tblName);
}
public JdbcDatabaseHandler(Config config) {
// "jdbc:mysql://dbhost/database?" + "user=sqluser&password=sqluserpw"
- this.tblNameMap = JdbcSchemaManager.tblNameMap;
+ //this.tblNameMap = JdbcSchemaManager.tblNameMap;
try {
- JdbcSchemaManager.getInstance().init(config);
+ //JdbcSchemaManager.getInstance().init(config);
BasicDataSource bDatasource = new BasicDataSource();
bDatasource.setDriverClassName(config.getString(MetadataUtils.JDBC_DRIVER_PATH));
if (config.hasPath(MetadataUtils.JDBC_USERNAME_PATH)) {
@@ -115,7 +139,7 @@ public class JdbcDatabaseHandler {
connection.commit();
} catch (SQLException e) {
LOG.error(e.getMessage(), e.getCause());
- if (e.getMessage().toLowerCase().contains("duplicate") && connection != null) {
+ if (connection != null) {
LOG.info("Detected duplicated entity");
try {
connection.rollback(savepoint);
@@ -193,15 +217,10 @@ public class JdbcDatabaseHandler {
return executeSelectStatement(clz, query);
}
- public <T> T listTop(Class<T> clz, String sortType) {
+ public <T> List<T> listOrderBy(Class<T> clz, String sortType) {
String tb = getTableName(clz.getSimpleName());
String query = String.format(QUERY_ORDERBY_STATEMENT, tb, sortType);
- List<T> result = executeSelectStatement(clz, query);
- if (result.isEmpty()) {
- return null;
- } else {
- return result.get(0);
- }
+ return executeSelectStatement(clz, query);
}
public <T> T listWithFilter(String key, Class<T> clz) {
@@ -287,7 +306,7 @@ public class JdbcDatabaseHandler {
Connection connection = null;
try {
connection = dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb, key));
+ PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb));
statement.setString(1, key);
int status = statement.executeUpdate();
String msg = String.format("delete %s entities from table %s", status, tb);
@@ -314,4 +333,67 @@ public class JdbcDatabaseHandler {
//JdbcSchemaManager.getInstance().shutdown();
}
+ public OpResult removeBatch(String clzName, List<String> keys) {
+ String tb = getTableName(clzName);
+ OpResult result = new OpResult();
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ connection.setAutoCommit(false);
+ PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb));
+ for (String key : keys) {
+ statement.setString(1, key);
+ statement.addBatch();
+ }
+ int[] num = statement.executeBatch();
+ connection.commit();
+ int sum = 0;
+ for (int i : num) {
+ sum += i;
+ }
+ String msg = String.format("delete %s records from table %s", sum, tb);
+ result.code = OpResult.SUCCESS;
+ result.message = msg;
+ statement.close();
+ } catch (SQLException e) {
+ result.code = OpResult.FAILURE;
+ result.message = e.getMessage();
+ LOG.error(e.getMessage(), e);
+ } finally {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
+ }
+ }
+ }
+ return result;
+ }
+
+ public OpResult removeScheduleStates(int capacity) {
+ OpResult result = new OpResult();
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ PreparedStatement statement = connection.prepareStatement(CLEAR_SCHEDULESTATES_STATEMENT);
+ statement.setInt(1, capacity);
+ result.message = String.format("delete %d records from schedule_state", statement.executeUpdate());
+ result.code = OpResult.SUCCESS;
+ statement.close();
+ } catch (SQLException e) {
+ result.code = OpResult.FAILURE;
+ result.message = e.getMessage();
+ LOG.error(e.getMessage(), e);
+ } finally {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.error("Failed to close statement: {}", e.getMessage(), e.getCause());
+ }
+ }
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
index 22435fe..384eddc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java
@@ -26,10 +26,14 @@ import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.alert.metadata.MetadataUtils;
import org.apache.eagle.alert.metadata.resource.Models;
import org.apache.eagle.alert.metadata.resource.OpResult;
+
import com.google.inject.Inject;
import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -37,6 +41,7 @@ import java.util.stream.Collectors;
* @since May 26, 2016.
*/
public class JdbcMetadataDaoImpl implements IMetadataDao {
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcMetadataDaoImpl.class);
private JdbcDatabaseHandler handler;
@Inject
@@ -101,12 +106,22 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
@Override
public ScheduleState getScheduleState(String versionId) {
return handler.listWithFilter(versionId, ScheduleState.class);
- //return null;
}
@Override
public ScheduleState getScheduleState() {
- return handler.listTop(ScheduleState.class, JdbcDatabaseHandler.SortType.DESC.toString());
+ List<ScheduleState> scheduleStates =
+ handler.listOrderBy(ScheduleState.class, JdbcDatabaseHandler.SortType.DESC.toString());
+ if (scheduleStates.isEmpty()) {
+ return null;
+ } else {
+ return scheduleStates.get(0);
+ }
+ }
+
+ @Override
+ public List<ScheduleState> listScheduleStates() {
+ return handler.list(ScheduleState.class);
}
@Override
@@ -160,6 +175,16 @@ public class JdbcMetadataDaoImpl implements IMetadataDao {
}
@Override
+ public OpResult clearScheduleState(int maxCapacity) {
+ if (maxCapacity <= 0) {
+ maxCapacity = 10;
+ }
+ OpResult result = handler.removeScheduleStates(maxCapacity);
+ LOG.info(result.message);
+ return result;
+ }
+
+ @Override
public OpResult addAssignment(PolicyAssignment assignment) {
return handler.addOrReplace(PolicyAssignment.class.getSimpleName(), assignment);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java
index 4568726..a02c51e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcSchemaManager.java
@@ -41,6 +41,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+@Deprecated
public class JdbcSchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(JdbcSchemaManager.class);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
index af0494e..d639bff 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
@@ -43,6 +43,7 @@ import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.alert.metadata.MetadataUtils;
import org.apache.eagle.alert.metadata.resource.Models;
import org.apache.eagle.alert.metadata.resource.OpResult;
+
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonString;
@@ -501,6 +502,16 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
return state;
}
+ @Override
+ public List<ScheduleState> listScheduleStates() {
+ throw new UnsupportedOperationException("listScheduleStates not support!");
+ }
+
+ @Override
+ public OpResult clearScheduleState(int maxCapacity) {
+ throw new UnsupportedOperationException("clearScheduleState not support!");
+ }
+
private ScheduleState addDetailForScheduleState(ScheduleState state, String version) {
Map<String, SpoutSpec> spoutMaps = maps(spoutSpecs, SpoutSpec.class, version);
if (spoutMaps.size() != 0) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java
index 7655f54..f45fd12 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.eagle.alert.metadata.impl;
-import com.google.common.collect.Lists;
import com.typesafe.config.ConfigFactory;
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.ScheduleState;
@@ -25,12 +24,11 @@ import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
import org.apache.eagle.alert.engine.model.AlertPublishEvent;
import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.alert.metadata.resource.OpResult;
+
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.LoggerFactory;
-import java.util.List;
-
/**
* @since May 1, 2016
*/
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java
index 7a2fcb5..9d188c4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java
@@ -20,6 +20,7 @@ package org.apache.eagle.alert.metadata.impl;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.ScheduleState;
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -29,15 +30,14 @@ import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.coordinator.PublishmentType;
import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.MetadataUtils;
+
import org.apache.eagle.alert.metadata.resource.OpResult;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
public class JdbcImplTest {
private static Logger LOG = LoggerFactory.getLogger(JdbcImplTest.class);
@@ -45,10 +45,9 @@ public class JdbcImplTest {
@BeforeClass
public static void setup() {
- System.setProperty("config.resource", "/application-mysql.conf");
ConfigFactory.invalidateCaches();
- Config config = ConfigFactory.load();
- dao = new JdbcMetadataDaoImpl(config.getConfig(MetadataUtils.META_DATA));
+ Config config = ConfigFactory.load("application-jdbc.conf");
+ dao = new JdbcMetadataDaoImpl(config);
}
@AfterClass
@@ -64,7 +63,6 @@ public class JdbcImplTest {
private String TOPO_NAME = "topoName";
- @Ignore
@Test
public void test_apis() {
// publishment
@@ -109,6 +107,8 @@ public class JdbcImplTest {
Assert.assertEquals(200, result.code);
List<StreamingCluster> assigns = dao.listClusters();
Assert.assertEquals(1, assigns.size());
+ dao.removeCluster("dd");
+ Assert.assertEquals(0, dao.listClusters().size());
}
// data source
{
@@ -133,10 +133,20 @@ public class JdbcImplTest {
{
PublishmentType publishmentType = new PublishmentType();
publishmentType.setType("KAFKA");
+ List<Map<String, String>> fields = new ArrayList<>();
+ Map<String, String> field1 = new HashMap<>();
+ field1.put("name", "kafka_broker");
+ field1.put("value", "sandbox.hortonworks.com:6667");
+ Map<String, String> field2 = new HashMap<>();
+ field2.put("name", "topic");
+ fields.add(field1);
+ fields.add(field2);
+ publishmentType.setFields(fields);
OpResult result = dao.addPublishmentType(publishmentType);
Assert.assertEquals(200, result.code);
- List<PublishmentType> assigns = dao.listPublishmentType();
- Assert.assertEquals(1, assigns.size());
+ List<PublishmentType> types = dao.listPublishmentType();
+ Assert.assertEquals(1, types.size());
+ Assert.assertEquals(2, types.get(0).getFields().size());
}
}
@@ -151,7 +161,6 @@ public class JdbcImplTest {
Assert.assertEquals(state.getVersion(), versionId);
}
- @Ignore
@Test
public void test_readCurrentState() {
test_addstate();
@@ -161,4 +170,26 @@ public class JdbcImplTest {
LOG.debug(state.getVersion());
LOG.debug(state.getGenerateTime());
}
+
+ @Test
+ public void test_clearScheduleState() {
+ int maxCapacity = 4;
+ List<String> reservedOnes = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ ScheduleState state = new ScheduleState();
+ String versionId = "state-" + System.currentTimeMillis();
+ state.setVersion(versionId);
+ state.setGenerateTime(String.valueOf(new Date().getTime()));
+ dao.addScheduleState(state);
+ if (i >= 10 - maxCapacity) {
+ reservedOnes.add(versionId);
+ }
+ }
+ dao.clearScheduleState(maxCapacity);
+ List<ScheduleState> scheduleStates = dao.listScheduleStates();
+ Assert.assertTrue(scheduleStates.size() == maxCapacity);
+ List<String> TargetVersions = new ArrayList<>();
+ scheduleStates.stream().forEach(state -> TargetVersions.add(state.getVersion()));
+ Assert.assertTrue(CollectionUtils.isEqualCollection(reservedOnes, TargetVersions));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-jdbc.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-jdbc.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-jdbc.conf
new file mode 100644
index 0000000..9c71a28
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-jdbc.conf
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+metadata {
+ metadataDao = org.apache.eagle.alert.metadata.impl.JdbcMetadataDaoImpl
+ jdbc {
+ username = null
+ password = null
+ driverClassName = "org.h2.Driver"
+ connection = "jdbc:h2:mem:test;INIT=RUNSCRIPT FROM './src/test/resources/init.sql'"
+ connectionProperties = "encoding=UTF8;timeout=60"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mysql.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mysql.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mysql.conf
deleted file mode 100644
index 2a1aa2c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/application-mysql.conf
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-metadata {
- metadataDao = org.apache.eagle.alert.metadata.impl.JdbcMetadataDaoImpl
- jdbc {
- connection = "jdbc:mysql://localhost:3306/alert_metadata?user=root&password=&createDatabaseIfNotExist=true"
- database = "alert_metadata"
- driverClassName = com.mysql.jdbc.Driver
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/init.sql
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/init.sql b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/init.sql
new file mode 100644
index 0000000..90e9515
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/resources/init.sql
@@ -0,0 +1,70 @@
+-- /*
+-- * Licensed to the Apache Software Foundation (ASF) under one or more
+-- * contributor license agreements. See the NOTICE file distributed with
+-- * this work for additional information regarding copyright ownership.
+-- * The ASF licenses this file to You under the Apache License, Version 2.0
+-- * (the "License"); you may not use this file except in compliance with
+-- * the License. You may obtain a copy of the License at
+-- *
+-- * http://www.apache.org/licenses/LICENSE-2.0
+-- *
+-- * Unless required by applicable law or agreed to in writing, software
+-- * distributed under the License is distributed on an "AS IS" BASIS,
+-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- * See the License for the specific language governing permissions and
+-- * limitations under the License.
+-- *
+-- */
+
+CREATE TABLE IF NOT EXISTS cluster (
+ id VARCHAR(50) PRIMARY KEY,
+ value longtext
+);
+
+
+CREATE TABLE IF NOT EXISTS stream_schema (
+ id VARCHAR(50) PRIMARY KEY,
+ value longtext
+);
+
+CREATE TABLE IF NOT EXISTS datasource (
+ id VARCHAR(50) PRIMARY KEY,
+ value longtext
+);
+
+CREATE TABLE IF NOT EXISTS policy (
+ id VARCHAR(50) PRIMARY KEY,
+ value longtext
+);
+
+CREATE TABLE IF NOT EXISTS publishment (
+ id VARCHAR(50) PRIMARY KEY,
+ value longtext
+);
+
+
+CREATE TABLE IF NOT EXISTS publishment_type (
+ id VARCHAR(50) PRIMARY KEY,
+ value longtext
+);
+
+
+CREATE TABLE IF NOT EXISTS schedule_state (
+ id VARCHAR(50) PRIMARY KEY,
+ value longtext
+);
+
+CREATE TABLE IF NOT EXISTS assignment (
+ id VARCHAR(50) PRIMARY KEY,
+ value longtext
+);
+
+CREATE TABLE IF NOT EXISTS topology (
+ id VARCHAR(50) PRIMARY KEY,
+ value longtext
+);
+
+CREATE TABLE IF NOT EXISTS alert_event (
+ id VARCHAR(50) PRIMARY KEY,
+ value longtext
+);
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-server-assembly/src/main/conf/eagle.conf
----------------------------------------------------------------------
diff --git a/eagle-server-assembly/src/main/conf/eagle.conf b/eagle-server-assembly/src/main/conf/eagle.conf
index 9c804a6..5f6c240 100644
--- a/eagle-server-assembly/src/main/conf/eagle.conf
+++ b/eagle-server-assembly/src/main/conf/eagle.conf
@@ -17,21 +17,21 @@
# Eagle REST Web Service Configuration
# ---------------------------------------------
service {
- env = "testing"
- host = "localhost"
- port = 9090
- username = "admin"
- password = "secret"
- readTimeOutSeconds = 60
- context = "/rest"
- timezone = "UTC"
+ env = "testing"
+ host = "localhost"
+ port = 9090
+ username = "admin"
+ password = "secret"
+ readTimeOutSeconds = 60
+ context = "/rest"
+ timezone = "UTC"
}
zookeeper {
- zkQuorum = "localhost:2181"
- zkSessionTimeoutMs : 15000
- zkRetryTimes : 3
- zkRetryInterval : 20000
+ zkQuorum = "localhost:2181"
+ zkSessionTimeoutMs : 15000
+ zkRetryTimes : 3
+ zkRetryInterval : 20000
}
# ---------------------------------------------
@@ -39,57 +39,57 @@ zookeeper {
# ---------------------------------------------
storage {
- # storage type: ["hbase","jdbc"]
- # default is "hbase"
- type = "hbase"
+ # storage type: ["hbase","jdbc"]
+ # default is "hbase"
+ type = "hbase"
- hbase {
- # hbase configuration: hbase.zookeeper.quorum
- # default is "localhost"
- zookeeperQuorum = "localhost"
+ hbase {
+ # hbase configuration: hbase.zookeeper.quorum
+ # default is "localhost"
+ zookeeperQuorum = "localhost"
- # hbase configuration: hbase.zookeeper.property.clientPort
- # default is 2181
- zookeeperPropertyClientPort = 2181
+ # hbase configuration: hbase.zookeeper.property.clientPort
+ # default is 2181
+ zookeeperPropertyClientPort = 2181
- # hbase configuration: zookeeper.znode.parent
- # default is "/hbase"
- zookeeperZnodeParent = "/hbase-unsecure"
+ # hbase configuration: zookeeper.znode.parent
+ # default is "/hbase"
+ zookeeperZnodeParent = "/hbase-unsecure"
- # eagle web login profile: [sandbox, default]
- # default is sandbox
- tableNamePrefixedWithEnvironment = false
+ # eagle web login profile: [sandbox, default]
+ # default is sandbox
+ tableNamePrefixedWithEnvironment = false
- # eagle coprocessor enabled or not: [true, false]
- # default is false
- coprocessorEnabled = false
- }
+ # eagle coprocessor enabled or not: [true, false]
+ # default is false
+ coprocessorEnabled = false
+ }
}
# ---------------------------------------------
# Eagle Metadata Store Configuration
# ---------------------------------------------
metadata {
- store = org.apache.eagle.metadata.service.memory.MemoryMetadataStore
- jdbc {
- username = "root"
- password = ""
- driverClassName = com.mysql.jdbc.Driver
- url = "jdbc:mysql://server.eagle.apache.org:3306/eagle"
- }
+ store = org.apache.eagle.metadata.service.memory.MemoryMetadataStore
+ jdbc {
+ username = "root"
+ password = ""
+ driverClassName = com.mysql.jdbc.Driver
+ url = "jdbc:mysql://server.eagle.apache.org:3306/eagle"
+ }
}
# ---------------------------------------------
# Eagle Application Configuration
# ---------------------------------------------
application {
- sink {
- type = org.apache.eagle.app.sink.KafkaStreamSink
- }
- storm {
- nimbusHost = "server.eagle.apache.org"
- nimbusThriftPort = 6627
- }
+ sink {
+ type = org.apache.eagle.app.sink.KafkaStreamSink
+ }
+ storm {
+ nimbusHost = "server.eagle.apache.org"
+ nimbusThriftPort = 6627
+ }
}
# ---------------------------------------------
@@ -98,27 +98,29 @@ application {
# Coordinator Configuration
coordinator {
- policiesPerBolt = 5
- boltParallelism = 5
- policyDefaultParallelism = 5
- boltLoadUpbound = 0.8
- topologyLoadUpbound = 0.8
- numOfAlertBoltsPerTopology = 5
- zkConfig {
- zkQuorum = "server.eagle.apache.org:2181"
- zkRoot = "/alert"
- zkSessionTimeoutMs = 10000
- connectionTimeoutMs = 10000
- zkRetryTimes = 3
- zkRetryInterval = 3000
- }
- metadataService {
- host = "localhost",
- port = 9090,
- context = "/rest"
- }
- metadataDynamicCheck {
- initDelayMillis = 1000
- delayMillis = 30000
+ policiesPerBolt = 5
+ boltParallelism = 5
+ policyDefaultParallelism = 5
+ boltLoadUpbound = 0.8
+ topologyLoadUpbound = 0.8
+ numOfAlertBoltsPerTopology = 5
+ zkConfig {
+ zkQuorum = "server.eagle.apache.org:2181"
+ zkRoot = "/alert"
+ zkSessionTimeoutMs = 10000
+ connectionTimeoutMs = 10000
+ zkRetryTimes = 3
+ zkRetryInterval = 3000
+ }
+ metadataService {
+ host = "localhost",
+ port = 9090,
+ context = "/rest"
+ }
+ metadataDynamicCheck {
+ initDelayMillis = 1000
+ delayMillis = 30000
+ stateClearPeriodMin = 1440
+ stateReservedCapacity = 100
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/021c2bdd/eagle-server/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/resources/application.conf b/eagle-server/src/main/resources/application.conf
index 91757d7..ce68550 100644
--- a/eagle-server/src/main/resources/application.conf
+++ b/eagle-server/src/main/resources/application.conf
@@ -139,5 +139,7 @@ coordinator {
metadataDynamicCheck {
initDelayMillis = 1000
delayMillis = 30000
+ stateClearPeriodMin = 1440
+ stateReservedCapacity = 100
}
}