You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/11/23 06:30:02 UTC
incubator-eagle git commit: [EAGLE-776] add unit test for
eagle-alert-parent
Repository: incubator-eagle
Updated Branches:
refs/heads/master 7499be694 -> 8d7f81e1c
[EAGLE-776] add unit test for eagle-alert-parent
add unit test for eagle-alert-parent
https://issues.apache.org/jira/browse/EAGLE-776
Author: koone <lu...@126.com>
Closes #673 from koone/EAGLE-777.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/8d7f81e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/8d7f81e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/8d7f81e1
Branch: refs/heads/master
Commit: 8d7f81e1cdcc61e16769677dda2c371897f25dca
Parents: 7499be6
Author: koone <lu...@126.com>
Authored: Wed Nov 23 14:29:48 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Wed Nov 23 14:29:48 2016 +0800
----------------------------------------------------------------------
.../alert-metadata-service/pom.xml | 13 +-
.../impl/TopologyMgmtResourceImplTest.java | 72 ++++
.../src/test/resources/application.conf | 10 +-
.../alert-metadata/pom.xml | 12 +
.../eagle/alert/metadata/MetadataUtils.java | 1 +
.../metadata/impl/InMemMetadataDaoImpl.java | 6 +-
.../eagle/alert/metadata/TestMetadataUtils.java | 59 ++++
.../eagle/alert/metadata/impl/InMemoryTest.java | 117 +++++++
.../eagle/alert/metadata/impl/JdbcImplTest.java | 164 +++++++++
.../alert/metadata/impl/MongoImplTest.java | 344 ++++++++++++++++++
.../alert/resource/impl/InMemoryTest.java | 48 ---
.../alert/resource/impl/JdbcImplTest.java | 165 ---------
.../alert/resource/impl/MongoImplTest.java | 345 -------------------
13 files changed, 791 insertions(+), 565 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
index 9d5e8f1..cf1f0fc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
@@ -84,7 +84,18 @@
<groupId>io.swagger</groupId>
<artifactId>swagger-jaxrs</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java
index e46213e..b9a7634 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java
@@ -18,13 +18,40 @@
package org.apache.eagle.service.topology.resource.impl;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.TopologySummary;
+import backtype.storm.utils.NimbusClient;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl;
+import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import static org.powermock.api.mockito.PowerMockito.when;
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({TopologyMgmtResourceImpl.class, StormSubmitter.class})
public class TopologyMgmtResourceImplTest {
TopologyMgmtResourceImpl topologyManager = new TopologyMgmtResourceImpl();
String topologyName = "testStartTopology";
@@ -52,4 +79,49 @@ public class TopologyMgmtResourceImplTest {
List<TopologyStatus> topologies = topologyManager.getTopologies();
Assert.assertTrue(topologies.size() == 1);
}
+
+ @Test
+ public void testGetTopologies1() throws Exception {
+ IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao();
+ TopologyMgmtResourceImpl service = new TopologyMgmtResourceImpl();
+ Field daoField = TopologyMgmtResourceImpl.class.getDeclaredField("dao");
+ daoField.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(daoField, daoField.getModifiers() & ~Modifier.FINAL);
+ daoField.set(null, dao);
+ // set data
+ Topology topology = new Topology("test", 1, 1);
+ StreamingCluster cluster =new StreamingCluster();
+ dao.clear();
+ dao.addTopology(topology);
+ dao.addCluster(cluster);
+ TopologyMgmtResourceImpl spy = PowerMockito.spy(service);
+ PowerMockito.doReturn(new TopologySummary()).when(spy,"getTopologySummery", Mockito.anyCollection(), Mockito.any(Topology.class));
+ Assert.assertEquals(1, spy.getTopologies().size());
+ }
+
+ @Test
+ public void testStartTopology1() throws Exception {
+ IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao();
+ TopologyMgmtResourceImpl service = new TopologyMgmtResourceImpl();
+ Field daoField = TopologyMgmtResourceImpl.class.getDeclaredField("dao");
+ daoField.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(daoField, daoField.getModifiers() & ~Modifier.FINAL);
+ daoField.set(null, dao);
+ // set data
+ Topology topology = new Topology("test", 1, 1);
+ StreamingCluster cluster =new StreamingCluster();
+ dao.clear();
+ dao.addTopology(topology);
+ dao.addCluster(cluster);
+ PowerMockito.mockStatic(StormSubmitter.class);
+ PowerMockito.doNothing().when(StormSubmitter.class, "submitTopology",Mockito.eq("test"), Mockito.anyMap(), Mockito.any(StormTopology.class));
+ TopologyMgmtResourceImpl spy = PowerMockito.spy(service);
+ PowerMockito.doReturn(null).when(spy,"createTopology", Mockito.any(Topology.class));
+ spy.startTopology("test");
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf
index f760241..1b6a281 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf
@@ -13,8 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-{
- "datastore": {
- "metadataDao": "org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl"
+metadata {
+ metadataDao = org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl
+ jdbc {
+ url = "localhost:27017"
+ }
+ properties {
+
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
index 07f373a..ebe24e2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
@@ -64,6 +64,18 @@
<artifactId>guice</artifactId>
<version>3.0</version>
</dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
index 3e03b57..be22280 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
@@ -75,6 +75,7 @@ public class MetadataUtils {
}
public static Connection getJdbcConnection(Config config) {
+
Connection connection = null;
try {
if (config.hasPath(JDBC_USERNAME_PATH)) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
index b608516..611bbb4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
@@ -16,6 +16,8 @@
*/
package org.apache.eagle.alert.metadata.impl;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.ScheduleState;
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -26,8 +28,6 @@ import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.alert.metadata.MetadataUtils;
import org.apache.eagle.alert.metadata.resource.Models;
import org.apache.eagle.alert.metadata.resource.OpResult;
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -296,7 +296,7 @@ public class InMemMetadataDaoImpl implements IMetadataDao {
}
@Override
- public OpResult clear() {
+ public synchronized OpResult clear() {
LOG.info("clear models...");
this.assignments.clear();
this.clusters.clear();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/TestMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/TestMetadataUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/TestMetadataUtils.java
new file mode 100644
index 0000000..1191dcb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/TestMetadataUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata;
+
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Created by luokun on 2016/11/16.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(MetadataUtils.class)
+public class TestMetadataUtils {
+
+ @Rule
+ public ExpectedException thrown= ExpectedException.none();
+
+ @Test
+ public void testGetKey() throws Exception {
+ StreamDefinition stream = new StreamDefinition();
+ Assert.assertNull(MetadataUtils.getKey(stream));
+ PolicyAssignment policyAssignment = new PolicyAssignment();
+ policyAssignment.setPolicyName("test");
+ Assert.assertEquals("test", MetadataUtils.getKey(policyAssignment));
+ ScheduleState scheduleState = new ScheduleState();
+ scheduleState.setVersion("1.0");
+ Assert.assertEquals("1.0", MetadataUtils.getKey(scheduleState));
+ }
+
+ @Test
+ public void testGetKeyThrowable() {
+ thrown.expect(RuntimeException.class);
+ Object obj = new Object();
+ MetadataUtils.getKey(obj);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java
new file mode 100644
index 0000000..7655f54
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata.impl;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.engine.model.AlertPublishEvent;
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.apache.eagle.alert.metadata.resource.OpResult;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * @since May 1, 2016
+ */
+public class InMemoryTest {
+
+ private IMetadataDao dao = new InMemMetadataDaoImpl(ConfigFactory.load());
+
+ @Test
+ public void test_AddPolicy() {
+
+ LoggerFactory.getLogger(InMemoryTest.class);
+
+ MetadataDaoFactory.getInstance().getMetadataDao();
+
+ PolicyDefinition pd = new PolicyDefinition();
+ pd.setName("pd1");
+ dao.addPolicy(pd);
+
+ Assert.assertEquals(1, dao.listPolicies().size());
+ }
+
+ @Test
+ public void testAddCluster(){
+ StreamingCluster cluster1 = new StreamingCluster();
+ cluster1.setName("test1");
+ StreamingCluster cluster2 = new StreamingCluster();
+ cluster2.setName("test2");
+ StreamingCluster cluster3 = new StreamingCluster();
+ cluster3.setName("test2");
+ OpResult opResult1 = dao.addCluster(cluster1);
+ Assert.assertEquals(OpResult.SUCCESS,opResult1.code);
+ OpResult opResult2 = dao.addCluster(cluster2);
+ Assert.assertEquals(OpResult.SUCCESS,opResult2.code);
+ OpResult opResult3 = dao.addCluster(cluster3);
+ Assert.assertEquals(OpResult.SUCCESS,opResult3.code);
+ Assert.assertTrue(opResult3.message.contains("replace"));
+ dao.clear();
+ }
+
+ @Test
+ public void testRemoveDataSource(){
+ Kafka2TupleMetadata dataSource1 = new Kafka2TupleMetadata();
+ Kafka2TupleMetadata dataSource2 = new Kafka2TupleMetadata();
+ dataSource1.setName("test1");
+ dataSource2.setName("test2");
+ dao.addDataSource(dataSource1);
+ dao.addDataSource(dataSource2);
+ OpResult opResult1 = dao.removeDataSource("test1");
+ Assert.assertEquals(OpResult.SUCCESS, opResult1.code);
+ OpResult opResult2 = dao.removeDataSource("test1");
+ Assert.assertEquals(OpResult.SUCCESS, opResult2.code);
+ Assert.assertTrue(opResult2.message.contains("no configuration"));
+ dao.clear();
+ }
+
+ @Test
+ public void testListAlertPublishEvent(){
+ dao.addAlertPublishEvent(new AlertPublishEvent());
+ dao.addAlertPublishEvent(new AlertPublishEvent());
+ Assert.assertEquals(2,dao.listAlertPublishEvent(5).size());
+ }
+
+ @Test
+ public void testGetAlertPublishEventByPolicyId(){
+ AlertPublishEvent alert1 = new AlertPublishEvent();
+ AlertPublishEvent alert2 = new AlertPublishEvent();
+ alert1.setAlertId("1");
+ alert1.setPolicyId("1");
+ alert2.setAlertId("2");
+ alert2.setPolicyId("1");
+ dao.addAlertPublishEvent(alert1);
+ dao.addAlertPublishEvent(alert2);
+ Assert.assertNotNull(dao.getAlertPublishEvent("1"));
+ Assert.assertEquals(2, dao.getAlertPublishEventByPolicyId("1", 2).size());
+ }
+
+ @Test
+ public void testAddScheduleState(){
+ ScheduleState scheduleState = new ScheduleState();
+ scheduleState.setVersion("1");
+ Assert.assertEquals(OpResult.SUCCESS,dao.addScheduleState(scheduleState).code);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java
new file mode 100644
index 0000000..7a2fcb5
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.metadata.impl;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.apache.eagle.alert.metadata.MetadataUtils;
+import org.apache.eagle.alert.metadata.resource.OpResult;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+
+public class JdbcImplTest {
+ private static Logger LOG = LoggerFactory.getLogger(JdbcImplTest.class);
+ static IMetadataDao dao;
+
+ @BeforeClass
+ public static void setup() {
+ System.setProperty("config.resource", "/application-mysql.conf");
+ ConfigFactory.invalidateCaches();
+ Config config = ConfigFactory.load();
+ dao = new JdbcMetadataDaoImpl(config.getConfig(MetadataUtils.META_DATA));
+ }
+
+ @AfterClass
+ public static void teardown() {
+ if (dao != null) {
+ try {
+ dao.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private String TOPO_NAME = "topoName";
+
+ @Ignore
+ @Test
+ public void test_apis() {
+ // publishment
+ {
+ Publishment publishment = new Publishment();
+ publishment.setName("pub-");
+ OpResult result = dao.addPublishment(publishment);
+ Assert.assertEquals(200, result.code);
+ List<Publishment> assigns = dao.listPublishment();
+ Assert.assertEquals(1, assigns.size());
+ result = dao.removePublishment("pub-");
+ Assert.assertTrue(200 == result.code);
+ }
+ // topology
+ {
+ OpResult result = dao.addTopology(new Topology(TOPO_NAME, 3, 5));
+ System.out.println(result.message);
+ Assert.assertEquals(200, result.code);
+ List<Topology> topos = dao.listTopologies();
+ Assert.assertEquals(1, topos.size());
+ // add again: replace existing one
+ result = dao.addTopology(new Topology(TOPO_NAME, 4, 5));
+ topos = dao.listTopologies();
+ Assert.assertEquals(1, topos.size());
+ Assert.assertEquals(TOPO_NAME, topos.get(0).getName());
+ Assert.assertEquals(4, topos.get(0).getNumOfGroupBolt());
+ }
+ // assignment
+ {
+ PolicyAssignment assignment = new PolicyAssignment();
+ assignment.setPolicyName("policy1");
+ OpResult result = dao.addAssignment(assignment);
+ Assert.assertEquals(200, result.code);
+ List<PolicyAssignment> assigns = dao.listAssignments();
+ Assert.assertEquals(1, assigns.size());
+ }
+ // cluster
+ {
+ StreamingCluster cluster = new StreamingCluster();
+ cluster.setName("dd");
+ OpResult result = dao.addCluster(cluster);
+ Assert.assertEquals(200, result.code);
+ List<StreamingCluster> assigns = dao.listClusters();
+ Assert.assertEquals(1, assigns.size());
+ }
+ // data source
+ {
+ Kafka2TupleMetadata dataSource = new Kafka2TupleMetadata();
+ dataSource.setName("ds");
+ OpResult result = dao.addDataSource(dataSource);
+ Assert.assertEquals(200, result.code);
+ List<Kafka2TupleMetadata> assigns = dao.listDataSources();
+ Assert.assertEquals(1, assigns.size());
+ }
+ // policy
+ {
+ PolicyDefinition policy = new PolicyDefinition();
+ policy.setName("ds");
+ OpResult result = dao.addPolicy(policy);
+ Assert.assertEquals(200, result.code);
+ List<PolicyDefinition> assigns = dao.listPolicies();
+ Assert.assertEquals(1, assigns.size());
+ }
+
+ // publishmentType
+ {
+ PublishmentType publishmentType = new PublishmentType();
+ publishmentType.setType("KAFKA");
+ OpResult result = dao.addPublishmentType(publishmentType);
+ Assert.assertEquals(200, result.code);
+ List<PublishmentType> assigns = dao.listPublishmentType();
+ Assert.assertEquals(1, assigns.size());
+ }
+ }
+
+ private void test_addstate() {
+ ScheduleState state = new ScheduleState();
+ String versionId = "state-" + System.currentTimeMillis();
+ state.setVersion(versionId);
+ state.setGenerateTime(String.valueOf(new Date().getTime()));
+ OpResult result = dao.addScheduleState(state);
+ Assert.assertEquals(200, result.code);
+ state = dao.getScheduleState();
+ Assert.assertEquals(state.getVersion(), versionId);
+ }
+
+ @Ignore
+ @Test
+ public void test_readCurrentState() {
+ test_addstate();
+ ScheduleState state = dao.getScheduleState();
+ Assert.assertNotNull(state);
+
+ LOG.debug(state.getVersion());
+ LOG.debug(state.getGenerateTime());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java
new file mode 100644
index 0000000..3b3ddf9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata.impl;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodProcess;
+import de.flapdoodle.embed.mongo.MongodStarter;
+import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
+import de.flapdoodle.embed.mongo.config.Net;
+import de.flapdoodle.embed.mongo.distribution.Version;
+import de.flapdoodle.embed.process.runtime.Network;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.eagle.alert.coordination.model.*;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.model.AlertPublishEvent;
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.apache.eagle.alert.metadata.MetadataUtils;
+import org.apache.eagle.alert.metadata.resource.OpResult;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * @since May 1, 2016
+ */
+public class MongoImplTest {
+ private static Logger LOG = LoggerFactory.getLogger(MongoImplTest.class);
+ static IMetadataDao dao;
+
+ private static MongodExecutable mongodExe;
+ private static MongodProcess mongod;
+
+ public static void before() {
+ try {
+ MongodStarter starter = MongodStarter.getDefaultInstance();
+ mongodExe = starter.prepare(new MongodConfigBuilder().version(Version.V3_2_1)
+ .net(new Net(27017, Network.localhostIsIPv6())).build());
+ mongod = mongodExe.start();
+ } catch (Exception e) {
+ LOG.error("start embed mongod failed, assume some external mongo running. continue run test!", e);
+ }
+ }
+
+ @BeforeClass
+ public static void setup() {
+ before();
+
+ System.setProperty("config.resource", "/application-mongo.conf");
+ ConfigFactory.invalidateCaches();
+ Config config = ConfigFactory.load();
+ dao = new MongoMetadataDaoImpl(config.getConfig(MetadataUtils.META_DATA));
+
+ }
+
+ @AfterClass
+ public static void teardown() {
+ if (mongod != null) {
+ try {
+ mongod.stop();
+ } catch (IllegalStateException e) {
+ // catch this exception for the unstable stopping mongodb
+ // reason: the exception is usually thrown out with below message format when stop() returns null value,
+ // but actually this should have been captured in ProcessControl.stopOrDestroyProcess() by destroying
+ // the process ultimately
+ if (e.getMessage() != null && e.getMessage().matches("^Couldn't kill.*process!.*")) {
+ // if matches, do nothing, just ignore the exception
+ } else {
+ LOG.warn(String.format("Ignored error for stopping mongod process, see stack trace: %s", ExceptionUtils.getStackTrace(e)));
+ }
+ }
+ mongodExe.stop();
+ }
+ }
+
+ private String TOPO_NAME = "topoName";
+
+ @Test
+ public void test_apis() throws Exception {
+ // topology
+ {
+ OpResult result = dao.addTopology(new Topology(TOPO_NAME, 3, 5));
+ System.out.println(result.message);
+ Assert.assertEquals(200, result.code);
+ List<Topology> topos = dao.listTopologies();
+ Assert.assertEquals(1, topos.size());
+
+ result = dao.addTopology(new Topology(TOPO_NAME + "-new", 3, 5));
+ topos = dao.listTopologies();
+ Assert.assertEquals(2, topos.size());
+ // add again: replace existing one
+ result = dao.addTopology(new Topology(TOPO_NAME, 4, 5));
+ topos = dao.listTopologies();
+ Assert.assertEquals(2, topos.size());
+ Assert.assertEquals(TOPO_NAME, topos.get(0).getName());
+ Assert.assertEquals(4, topos.get(0).getNumOfGroupBolt());
+ }
+ // assignment
+ {
+ PolicyAssignment assignment = new PolicyAssignment();
+ assignment.setPolicyName("policy1");
+ OpResult result = dao.addAssignment(assignment);
+ Assert.assertEquals(200, result.code);
+ List<PolicyAssignment> assigns = dao.listAssignments();
+ Assert.assertEquals(1, assigns.size());
+ }
+ // cluster
+ {
+ StreamingCluster cluster = new StreamingCluster();
+ cluster.setName("dd");
+ OpResult result = dao.addCluster(cluster);
+ Assert.assertEquals(200, result.code);
+ List<StreamingCluster> assigns = dao.listClusters();
+ Assert.assertEquals(1, assigns.size());
+ }
+ // data source
+ {
+ Kafka2TupleMetadata dataSource = new Kafka2TupleMetadata();
+ dataSource.setName("ds");
+ OpResult result = dao.addDataSource(dataSource);
+ Assert.assertEquals(200, result.code);
+ List<Kafka2TupleMetadata> assigns = dao.listDataSources();
+ Assert.assertEquals(1, assigns.size());
+ }
+ // policy
+ {
+ PolicyDefinition policy = new PolicyDefinition();
+ policy.setName("ds");
+ OpResult result = dao.addPolicy(policy);
+ Assert.assertEquals(200, result.code);
+ List<PolicyDefinition> assigns = dao.listPolicies();
+ Assert.assertEquals(1, assigns.size());
+ }
+ // publishment
+ {
+ Publishment publishment = new Publishment();
+ publishment.setName("pub-");
+ OpResult result = dao.addPublishment(publishment);
+ Assert.assertEquals(200, result.code);
+ List<Publishment> assigns = dao.listPublishment();
+ Assert.assertEquals(1, assigns.size());
+ }
+ // publishmentType
+ {
+ PublishmentType publishmentType = new PublishmentType();
+ publishmentType.setType("KAFKA");
+ OpResult result = dao.addPublishmentType(publishmentType);
+ Assert.assertEquals(200, result.code);
+ List<PublishmentType> assigns = dao.listPublishmentType();
+ Assert.assertEquals(1, assigns.size());
+ }
+
+ // schedule state
+ {
+ ScheduleState state = new ScheduleState();
+ state.setVersion("001");
+ state.setScheduleTimeMillis(3000);
+ state.setCode(200);
+ OpResult result = dao.addScheduleState(state);
+ Assert.assertEquals(200, result.code);
+
+ Thread.sleep(1000);
+
+ state = new ScheduleState();
+ state.setScheduleTimeMillis(3000);
+ state.setVersion("002");
+ state.setCode(201);
+ result = dao.addScheduleState(state);
+ Assert.assertEquals(200, result.code);
+
+ ScheduleState getState = dao.getScheduleState();
+ Assert.assertEquals(201, getState.getCode());
+ }
+ // stream
+ {
+ StreamDefinition stream = new StreamDefinition();
+ stream.setStreamId("stream");
+ OpResult result = dao.createStream(stream);
+ Assert.assertEquals(200, result.code);
+ List<StreamDefinition> assigns = dao.listStreams();
+ Assert.assertEquals(1, assigns.size());
+ }
+ // alert
+ {
+ AlertPublishEvent alert = new AlertPublishEvent();
+ alert.setAlertTimestamp(System.currentTimeMillis());
+ alert.setAlertId(UUID.randomUUID().toString());
+ OpResult result = dao.addAlertPublishEvent(alert);
+ Assert.assertEquals(200, result.code);
+ List<AlertPublishEvent> alerts = dao.listAlertPublishEvent(2);
+ Assert.assertEquals(1, alerts.size());
+ }
+ }
+
+ private void test_addstate() {
+ ScheduleState state = new ScheduleState();
+ state.setVersion("state-" + System.currentTimeMillis());
+ state.setGenerateTime(String.valueOf(new Date().getTime()));
+ OpResult result = dao.addScheduleState(state);
+ Assert.assertEquals(200, result.code);
+ }
+
+ @Test
+ public void test_readCurrentState() {
+ test_addstate();
+ ScheduleState state = dao.getScheduleState();
+ Assert.assertNotNull(state);
+
+ System.out.println(state.getVersion());
+ System.out.println(state.getGenerateTime());
+ }
+
+ private void test_addCompleteScheduleState() {
+ Long timestamp = System.currentTimeMillis();
+ String version = "state-" + timestamp;
+
+ // SpoutSpec
+ Map<String, SpoutSpec> spoutSpecsMap = new HashMap<>();
+ SpoutSpec spoutSpec1 = new SpoutSpec();
+ String topologyId1 = "testUnitTopology1_" + timestamp;
+ spoutSpec1.setTopologyId(topologyId1);
+
+ Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap = new HashMap<>();
+ Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata();
+ kafka2TupleMetadata.setType("KAFKA");
+ kafka2TupleMetadataMap.put("preprocess.network-sherlock.events", kafka2TupleMetadata);
+ spoutSpec1.setKafka2TupleMetadataMap(kafka2TupleMetadataMap);
+
+ Map<String, List<StreamRepartitionMetadata>> streamRepartitionMetadataMap= new HashMap<>();
+ List<StreamRepartitionMetadata> StreamRepartitionMetadataList = new ArrayList<>();
+ StreamRepartitionMetadata streamRepartitionMetadata = new StreamRepartitionMetadata();
+ List<StreamRepartitionStrategy> groupingStrategies = new ArrayList();
+ StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
+ streamRepartitionStrategy.setStartSequence(4);
+ groupingStrategies.add(streamRepartitionStrategy);
+ streamRepartitionMetadata.setGroupingStrategies(groupingStrategies);
+ StreamRepartitionMetadataList.add(streamRepartitionMetadata);
+ streamRepartitionMetadataMap.put("preprocess.network-nervecenter.events", StreamRepartitionMetadataList);
+ spoutSpec1.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap);
+ spoutSpecsMap.put(topologyId1, spoutSpec1);
+
+ SpoutSpec spoutSpec2 = new SpoutSpec();
+ String topologyId2 = "testUnitTopology2_" + timestamp;
+ spoutSpec2.setTopologyId(topologyId2);
+ spoutSpec2.setKafka2TupleMetadataMap(kafka2TupleMetadataMap);
+ spoutSpec2.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap);
+ spoutSpecsMap.put(topologyId2, spoutSpec2);
+
+ // Alert Spec
+ Map<String, AlertBoltSpec> alertSpecsMap = new HashMap<>();
+ alertSpecsMap.put(topologyId1, new AlertBoltSpec(topologyId1));
+
+ // GroupSpec
+ Map<String, RouterSpec> groupSpecsMap = new HashMap<>();
+ groupSpecsMap.put(topologyId1, new RouterSpec(topologyId1));
+
+ // PublishSpec
+ Map<String, PublishSpec> pubMap = new HashMap<>();
+ pubMap.put(topologyId1, new PublishSpec(topologyId1, "testPublishBolt"));
+
+ // Policy Snapshots
+ Collection<PolicyDefinition> policySnapshots = new ArrayList<>();
+ PolicyDefinition policy = new PolicyDefinition();
+ policy.setName("testPolicyDefinition");
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ def.setValue("1,jobID,job1,daily_rule,14:00:00,15:00:00");
+ def.setType("absencealert");
+ policy.setDefinition(def);
+ policySnapshots.add(policy);
+
+ // Stream Snapshots
+ Collection<StreamDefinition> streams = new ArrayList<>();
+ StreamDefinition stream = new StreamDefinition();
+ stream.setStreamId("testStream");
+ streams.add(stream);
+
+ // Monitored Streams
+ Collection<MonitoredStream> monitoredStreams = new ArrayList<>();
+ StreamPartition partition = new StreamPartition();
+ partition.setType(StreamPartition.Type.GLOBAL);
+ partition.setStreamId("s1");
+ partition.setColumns(Arrays.asList("f1", "f2"));
+ StreamGroup sg = new StreamGroup();
+ sg.addStreamPartition(partition);
+ MonitoredStream monitoredStream = new MonitoredStream(sg);
+ monitoredStreams.add(monitoredStream);
+
+ // Assignments
+ Collection<PolicyAssignment> assignments = new ArrayList<>();
+ assignments.add(new PolicyAssignment("syslog_regex", "SG[syslog_stream-]" + timestamp));
+
+ ScheduleState state = new ScheduleState(version, spoutSpecsMap, groupSpecsMap, alertSpecsMap, pubMap,
+ assignments, monitoredStreams, policySnapshots, streams);
+
+ OpResult result = dao.addScheduleState(state);
+ Assert.assertEquals(200, result.code);
+ }
+
+ @Test
+ public void test_readCompleteScheduleState() {
+ test_addCompleteScheduleState();
+
+ ScheduleState state = dao.getScheduleState();
+ Assert.assertNotNull(state);
+ Assert.assertEquals(2, state.getSpoutSpecs().size());
+ Assert.assertEquals(1, state.getAlertSpecs().size());
+ Assert.assertEquals(1, state.getGroupSpecs().size());
+ Assert.assertEquals(1, state.getPublishSpecs().size());
+ Assert.assertEquals(1, state.getPolicySnapshots().size());
+ Assert.assertEquals(1, state.getStreamSnapshots().size());
+ Assert.assertEquals(1, state.getMonitoredStreams().size());
+ Assert.assertEquals(1, state.getAssignments().size());
+
+
+ System.out.println(state.getVersion());
+ System.out.println(state.getGenerateTime());
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java
deleted file mode 100644
index 840f4a7..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.alert.resource.impl;
-
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl;
-import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.LoggerFactory;
-
-/**
- * @since May 1, 2016
- */
-public class InMemoryTest {
-
- private IMetadataDao dao = new InMemMetadataDaoImpl(ConfigFactory.load());
-
- @Test
- public void test_AddPolicy() {
-
- LoggerFactory.getLogger(InMemoryTest.class);
-
- MetadataDaoFactory.getInstance().getMetadataDao();
-
- PolicyDefinition pd = new PolicyDefinition();
- pd.setName("pd1");
- dao.addPolicy(pd);
-
- Assert.assertEquals(1, dao.listPolicies().size());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/JdbcImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/JdbcImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/JdbcImplTest.java
deleted file mode 100644
index 158c0c2..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/JdbcImplTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.service.alert.resource.impl;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.apache.eagle.alert.metadata.impl.JdbcMetadataDaoImpl;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-import org.junit.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.List;
-
-public class JdbcImplTest {
- private static Logger LOG = LoggerFactory.getLogger(JdbcImplTest.class);
- static IMetadataDao dao;
-
- @BeforeClass
- public static void setup() {
- System.setProperty("config.resource", "/application-mysql.conf");
- ConfigFactory.invalidateCaches();
- Config config = ConfigFactory.load();
- dao = new JdbcMetadataDaoImpl(config.getConfig(MetadataUtils.META_DATA));
- }
-
- @AfterClass
- public static void teardown() {
- if (dao != null) {
- try {
- dao.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- private String TOPO_NAME = "topoName";
-
- @Ignore
- @Test
- public void test_apis() {
- // publishment
- {
- Publishment publishment = new Publishment();
- publishment.setName("pub-");
- OpResult result = dao.addPublishment(publishment);
- Assert.assertEquals(200, result.code);
- List<Publishment> assigns = dao.listPublishment();
- Assert.assertEquals(1, assigns.size());
- result = dao.removePublishment("pub-");
- Assert.assertTrue(200 == result.code);
- }
- // topology
- {
- OpResult result = dao.addTopology(new Topology(TOPO_NAME, 3, 5));
- System.out.println(result.message);
- Assert.assertEquals(200, result.code);
- List<Topology> topos = dao.listTopologies();
- Assert.assertEquals(1, topos.size());
- // add again: replace existing one
- result = dao.addTopology(new Topology(TOPO_NAME, 4, 5));
- topos = dao.listTopologies();
- Assert.assertEquals(1, topos.size());
- Assert.assertEquals(TOPO_NAME, topos.get(0).getName());
- Assert.assertEquals(4, topos.get(0).getNumOfGroupBolt());
- }
- // assignment
- {
- PolicyAssignment assignment = new PolicyAssignment();
- assignment.setPolicyName("policy1");
- OpResult result = dao.addAssignment(assignment);
- Assert.assertEquals(200, result.code);
- List<PolicyAssignment> assigns = dao.listAssignments();
- Assert.assertEquals(1, assigns.size());
- }
- // cluster
- {
- StreamingCluster cluster = new StreamingCluster();
- cluster.setName("dd");
- OpResult result = dao.addCluster(cluster);
- Assert.assertEquals(200, result.code);
- List<StreamingCluster> assigns = dao.listClusters();
- Assert.assertEquals(1, assigns.size());
- }
- // data source
- {
- Kafka2TupleMetadata dataSource = new Kafka2TupleMetadata();
- dataSource.setName("ds");
- OpResult result = dao.addDataSource(dataSource);
- Assert.assertEquals(200, result.code);
- List<Kafka2TupleMetadata> assigns = dao.listDataSources();
- Assert.assertEquals(1, assigns.size());
- }
- // policy
- {
- PolicyDefinition policy = new PolicyDefinition();
- policy.setName("ds");
- OpResult result = dao.addPolicy(policy);
- Assert.assertEquals(200, result.code);
- List<PolicyDefinition> assigns = dao.listPolicies();
- Assert.assertEquals(1, assigns.size());
- }
-
- // publishmentType
- {
- PublishmentType publishmentType = new PublishmentType();
- publishmentType.setType("KAFKA");
- OpResult result = dao.addPublishmentType(publishmentType);
- Assert.assertEquals(200, result.code);
- List<PublishmentType> assigns = dao.listPublishmentType();
- Assert.assertEquals(1, assigns.size());
- }
- }
-
- private void test_addstate() {
- ScheduleState state = new ScheduleState();
- String versionId = "state-" + System.currentTimeMillis();
- state.setVersion(versionId);
- state.setGenerateTime(String.valueOf(new Date().getTime()));
- OpResult result = dao.addScheduleState(state);
- Assert.assertEquals(200, result.code);
- state = dao.getScheduleState();
- Assert.assertEquals(state.getVersion(), versionId);
- }
-
- @Ignore
- @Test
- public void test_readCurrentState() {
- test_addstate();
- ScheduleState state = dao.getScheduleState();
- Assert.assertNotNull(state);
-
- LOG.debug(state.getVersion());
- LOG.debug(state.getGenerateTime());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
deleted file mode 100644
index 4328be3..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.alert.resource.impl;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import de.flapdoodle.embed.mongo.MongodExecutable;
-import de.flapdoodle.embed.mongo.MongodProcess;
-import de.flapdoodle.embed.mongo.MongodStarter;
-import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
-import de.flapdoodle.embed.mongo.config.Net;
-import de.flapdoodle.embed.mongo.distribution.Version;
-import de.flapdoodle.embed.process.runtime.Network;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.eagle.alert.coordination.model.*;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.apache.eagle.alert.metadata.impl.MongoMetadataDaoImpl;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * @since May 1, 2016
- */
-public class MongoImplTest {
- private static Logger LOG = LoggerFactory.getLogger(MongoImplTest.class);
- static IMetadataDao dao;
-
- private static MongodExecutable mongodExe;
- private static MongodProcess mongod;
-
- public static void before() {
- try {
- MongodStarter starter = MongodStarter.getDefaultInstance();
- mongodExe = starter.prepare(new MongodConfigBuilder().version(Version.V3_2_1)
- .net(new Net(27017, Network.localhostIsIPv6())).build());
- mongod = mongodExe.start();
- } catch (Exception e) {
- LOG.error("start embed mongod failed, assume some external mongo running. continue run test!", e);
- }
- }
-
- @BeforeClass
- public static void setup() {
- before();
-
- System.setProperty("config.resource", "/application-mongo.conf");
- ConfigFactory.invalidateCaches();
- Config config = ConfigFactory.load();
- dao = new MongoMetadataDaoImpl(config.getConfig(MetadataUtils.META_DATA));
-
- }
-
- @AfterClass
- public static void teardown() {
- if (mongod != null) {
- try {
- mongod.stop();
- } catch (IllegalStateException e) {
- // catch this exception for the unstable stopping mongodb
- // reason: the exception is usually thrown out with below message format when stop() returns null value,
- // but actually this should have been captured in ProcessControl.stopOrDestroyProcess() by destroying
- // the process ultimately
- if (e.getMessage() != null && e.getMessage().matches("^Couldn't kill.*process!.*")) {
- // if matches, do nothing, just ignore the exception
- } else {
- LOG.warn(String.format("Ignored error for stopping mongod process, see stack trace: %s", ExceptionUtils.getStackTrace(e)));
- }
- }
- mongodExe.stop();
- }
- }
-
- private String TOPO_NAME = "topoName";
-
- @Test
- public void test_apis() throws Exception {
- // topology
- {
- OpResult result = dao.addTopology(new Topology(TOPO_NAME, 3, 5));
- System.out.println(result.message);
- Assert.assertEquals(200, result.code);
- List<Topology> topos = dao.listTopologies();
- Assert.assertEquals(1, topos.size());
-
- result = dao.addTopology(new Topology(TOPO_NAME + "-new", 3, 5));
- topos = dao.listTopologies();
- Assert.assertEquals(2, topos.size());
- // add again: replace existing one
- result = dao.addTopology(new Topology(TOPO_NAME, 4, 5));
- topos = dao.listTopologies();
- Assert.assertEquals(2, topos.size());
- Assert.assertEquals(TOPO_NAME, topos.get(0).getName());
- Assert.assertEquals(4, topos.get(0).getNumOfGroupBolt());
- }
- // assignment
- {
- PolicyAssignment assignment = new PolicyAssignment();
- assignment.setPolicyName("policy1");
- OpResult result = dao.addAssignment(assignment);
- Assert.assertEquals(200, result.code);
- List<PolicyAssignment> assigns = dao.listAssignments();
- Assert.assertEquals(1, assigns.size());
- }
- // cluster
- {
- StreamingCluster cluster = new StreamingCluster();
- cluster.setName("dd");
- OpResult result = dao.addCluster(cluster);
- Assert.assertEquals(200, result.code);
- List<StreamingCluster> assigns = dao.listClusters();
- Assert.assertEquals(1, assigns.size());
- }
- // data source
- {
- Kafka2TupleMetadata dataSource = new Kafka2TupleMetadata();
- dataSource.setName("ds");
- OpResult result = dao.addDataSource(dataSource);
- Assert.assertEquals(200, result.code);
- List<Kafka2TupleMetadata> assigns = dao.listDataSources();
- Assert.assertEquals(1, assigns.size());
- }
- // policy
- {
- PolicyDefinition policy = new PolicyDefinition();
- policy.setName("ds");
- OpResult result = dao.addPolicy(policy);
- Assert.assertEquals(200, result.code);
- List<PolicyDefinition> assigns = dao.listPolicies();
- Assert.assertEquals(1, assigns.size());
- }
- // publishment
- {
- Publishment publishment = new Publishment();
- publishment.setName("pub-");
- OpResult result = dao.addPublishment(publishment);
- Assert.assertEquals(200, result.code);
- List<Publishment> assigns = dao.listPublishment();
- Assert.assertEquals(1, assigns.size());
- }
- // publishmentType
- {
- PublishmentType publishmentType = new PublishmentType();
- publishmentType.setType("KAFKA");
- OpResult result = dao.addPublishmentType(publishmentType);
- Assert.assertEquals(200, result.code);
- List<PublishmentType> assigns = dao.listPublishmentType();
- Assert.assertEquals(1, assigns.size());
- }
-
- // schedule state
- {
- ScheduleState state = new ScheduleState();
- state.setVersion("001");
- state.setScheduleTimeMillis(3000);
- state.setCode(200);
- OpResult result = dao.addScheduleState(state);
- Assert.assertEquals(200, result.code);
-
- Thread.sleep(1000);
-
- state = new ScheduleState();
- state.setScheduleTimeMillis(3000);
- state.setVersion("002");
- state.setCode(201);
- result = dao.addScheduleState(state);
- Assert.assertEquals(200, result.code);
-
- ScheduleState getState = dao.getScheduleState();
- Assert.assertEquals(201, getState.getCode());
- }
- // stream
- {
- StreamDefinition stream = new StreamDefinition();
- stream.setStreamId("stream");
- OpResult result = dao.createStream(stream);
- Assert.assertEquals(200, result.code);
- List<StreamDefinition> assigns = dao.listStreams();
- Assert.assertEquals(1, assigns.size());
- }
- // alert
- {
- AlertPublishEvent alert = new AlertPublishEvent();
- alert.setAlertTimestamp(System.currentTimeMillis());
- alert.setAlertId(UUID.randomUUID().toString());
- OpResult result = dao.addAlertPublishEvent(alert);
- Assert.assertEquals(200, result.code);
- List<AlertPublishEvent> alerts = dao.listAlertPublishEvent(2);
- Assert.assertEquals(1, alerts.size());
- }
- }
-
- private void test_addstate() {
- ScheduleState state = new ScheduleState();
- state.setVersion("state-" + System.currentTimeMillis());
- state.setGenerateTime(String.valueOf(new Date().getTime()));
- OpResult result = dao.addScheduleState(state);
- Assert.assertEquals(200, result.code);
- }
-
- @Test
- public void test_readCurrentState() {
- test_addstate();
- ScheduleState state = dao.getScheduleState();
- Assert.assertNotNull(state);
-
- System.out.println(state.getVersion());
- System.out.println(state.getGenerateTime());
- }
-
- private void test_addCompleteScheduleState() {
- Long timestamp = System.currentTimeMillis();
- String version = "state-" + timestamp;
-
- // SpoutSpec
- Map<String, SpoutSpec> spoutSpecsMap = new HashMap<>();
- SpoutSpec spoutSpec1 = new SpoutSpec();
- String topologyId1 = "testUnitTopology1_" + timestamp;
- spoutSpec1.setTopologyId(topologyId1);
-
- Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap = new HashMap<>();
- Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata();
- kafka2TupleMetadata.setType("KAFKA");
- kafka2TupleMetadataMap.put("preprocess.network-sherlock.events", kafka2TupleMetadata);
- spoutSpec1.setKafka2TupleMetadataMap(kafka2TupleMetadataMap);
-
- Map<String, List<StreamRepartitionMetadata>> streamRepartitionMetadataMap= new HashMap<>();
- List<StreamRepartitionMetadata> StreamRepartitionMetadataList = new ArrayList<>();
- StreamRepartitionMetadata streamRepartitionMetadata = new StreamRepartitionMetadata();
- List<StreamRepartitionStrategy> groupingStrategies = new ArrayList();
- StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
- streamRepartitionStrategy.setStartSequence(4);
- groupingStrategies.add(streamRepartitionStrategy);
- streamRepartitionMetadata.setGroupingStrategies(groupingStrategies);
- StreamRepartitionMetadataList.add(streamRepartitionMetadata);
- streamRepartitionMetadataMap.put("preprocess.network-nervecenter.events", StreamRepartitionMetadataList);
- spoutSpec1.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap);
- spoutSpecsMap.put(topologyId1, spoutSpec1);
-
- SpoutSpec spoutSpec2 = new SpoutSpec();
- String topologyId2 = "testUnitTopology2_" + timestamp;
- spoutSpec2.setTopologyId(topologyId2);
- spoutSpec2.setKafka2TupleMetadataMap(kafka2TupleMetadataMap);
- spoutSpec2.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap);
- spoutSpecsMap.put(topologyId2, spoutSpec2);
-
- // Alert Spec
- Map<String, AlertBoltSpec> alertSpecsMap = new HashMap<>();
- alertSpecsMap.put(topologyId1, new AlertBoltSpec(topologyId1));
-
- // GroupSpec
- Map<String, RouterSpec> groupSpecsMap = new HashMap<>();
- groupSpecsMap.put(topologyId1, new RouterSpec(topologyId1));
-
- // PublishSpec
- Map<String, PublishSpec> pubMap = new HashMap<>();
- pubMap.put(topologyId1, new PublishSpec(topologyId1, "testPublishBolt"));
-
- // Policy Snapshots
- Collection<PolicyDefinition> policySnapshots = new ArrayList<>();
- PolicyDefinition policy = new PolicyDefinition();
- policy.setName("testPolicyDefinition");
- PolicyDefinition.Definition def = new PolicyDefinition.Definition();
- def.setValue("1,jobID,job1,daily_rule,14:00:00,15:00:00");
- def.setType("absencealert");
- policy.setDefinition(def);
- policySnapshots.add(policy);
-
- // Stream Snapshots
- Collection<StreamDefinition> streams = new ArrayList<>();
- StreamDefinition stream = new StreamDefinition();
- stream.setStreamId("testStream");
- streams.add(stream);
-
- // Monitored Streams
- Collection<MonitoredStream> monitoredStreams = new ArrayList<>();
- StreamPartition partition = new StreamPartition();
- partition.setType(StreamPartition.Type.GLOBAL);
- partition.setStreamId("s1");
- partition.setColumns(Arrays.asList("f1", "f2"));
- StreamGroup sg = new StreamGroup();
- sg.addStreamPartition(partition);
- MonitoredStream monitoredStream = new MonitoredStream(sg);
- monitoredStreams.add(monitoredStream);
-
- // Assignments
- Collection<PolicyAssignment> assignments = new ArrayList<>();
- assignments.add(new PolicyAssignment("syslog_regex", "SG[syslog_stream-]" + timestamp));
-
- ScheduleState state = new ScheduleState(version, spoutSpecsMap, groupSpecsMap, alertSpecsMap, pubMap,
- assignments, monitoredStreams, policySnapshots, streams);
-
- OpResult result = dao.addScheduleState(state);
- Assert.assertEquals(200, result.code);
- }
-
- @Test
- public void test_readCompleteScheduleState() {
- test_addCompleteScheduleState();
-
- ScheduleState state = dao.getScheduleState();
- Assert.assertNotNull(state);
- Assert.assertEquals(2, state.getSpoutSpecs().size());
- Assert.assertEquals(1, state.getAlertSpecs().size());
- Assert.assertEquals(1, state.getGroupSpecs().size());
- Assert.assertEquals(1, state.getPublishSpecs().size());
- Assert.assertEquals(1, state.getPolicySnapshots().size());
- Assert.assertEquals(1, state.getStreamSnapshots().size());
- Assert.assertEquals(1, state.getMonitoredStreams().size());
- Assert.assertEquals(1, state.getAssignments().size());
-
-
- System.out.println(state.getVersion());
- System.out.println(state.getGenerateTime());
-
-
- }
-}