You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/08/11 06:55:13 UTC
[1/2] incubator-eagle git commit: EAGLE-440: Alert mongodb storage
refine
Repository: incubator-eagle
Updated Branches:
refs/heads/develop 08abde513 -> a772a0556
EAGLE-440: Alert mongodb storage refine
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/678437aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/678437aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/678437aa
Branch: refs/heads/develop
Commit: 678437aadb5ca6e5c7256fabd40b4966e4227911
Parents: e2532a1
Author: mizeng <mi...@ebaysf.com>
Authored: Wed Aug 10 16:04:16 2016 +0800
Committer: mizeng <mi...@ebaysf.com>
Committed: Thu Aug 11 10:52:38 2016 +0800
----------------------------------------------------------------------
.../model/internal/ScheduleStateBase.java | 86 ++++++
.../metadata/impl/MongoMetadataDaoImpl.java | 298 +++++++++++++++----
.../alert/resource/impl/MongoImplTest.java | 121 ++++++--
3 files changed, 434 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/678437aa/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java
new file mode 100644
index 0000000..a1efbf9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model.internal;
+
+/**
+ *
+ * This is the Base part of ScheduleState, only contains version/generateTime/code/message/scheduleTimeMillis
+ *
+ *
+ * @since Aug 10, 2016
+ *
+ */
+public class ScheduleStateBase {
+ private String version;
+ // FIXME : should be date, can not make it simple in mongo..
+ private String generateTime;
+ private int code = 200;
+ private String message = "OK";
+ private int scheduleTimeMillis;
+
+ public ScheduleStateBase(String version, String generateTime, int code, String message, int scheduleTimeMillis) {
+ this.version = version;
+ this.generateTime = generateTime;
+ this.code = code;
+ this.message = message;
+ this.scheduleTimeMillis = scheduleTimeMillis;
+ }
+
+ public int getScheduleTimeMillis() {
+ return scheduleTimeMillis;
+ }
+
+ public void setScheduleTimeMillis(int scheduleTimeMillis) {
+ this.scheduleTimeMillis = scheduleTimeMillis;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ public String getGenerateTime() {
+ return generateTime;
+ }
+
+ public void setGenerateTime(String generateTime) {
+ this.generateTime = generateTime;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/678437aa/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
index a990b13..d68dc6a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
@@ -16,20 +16,26 @@
*/
package org.apache.eagle.alert.metadata.impl;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
+import com.mongodb.Block;
+import com.mongodb.Function;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoClientURI;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.IndexOptions;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.result.DeleteResult;
+import com.mongodb.client.result.UpdateResult;
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.coordination.model.*;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.ScheduleStateBase;
import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.engine.coordinator.*;
import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.alert.metadata.MetadataUtils;
import org.apache.eagle.alert.metadata.resource.Models;
@@ -41,18 +47,11 @@ import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.mongodb.Function;
-import com.mongodb.MongoClient;
-import com.mongodb.MongoClientURI;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.IndexOptions;
-import com.mongodb.client.model.UpdateOptions;
-import com.mongodb.client.result.DeleteResult;
-import com.mongodb.client.result.UpdateResult;
-import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
/**
* @since Apr 11, 2016
@@ -77,9 +76,18 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
private MongoCollection<Document> policy;
private MongoCollection<Document> publishment;
private MongoCollection<Document> publishmentType;
+ private MongoCollection<Document> topologies;
+
+ // scheduleStates splits to several collections
private MongoCollection<Document> scheduleStates;
+ private MongoCollection<Document> spoutSpecs;
+ private MongoCollection<Document> alertSpecs;
+ private MongoCollection<Document> groupSpecs;
+ private MongoCollection<Document> publishSpecs;
+ private MongoCollection<Document> policySnapshots;
+ private MongoCollection<Document> streamSnapshots;
+ private MongoCollection<Document> monitoredStreams;
private MongoCollection<Document> assignments;
- private MongoCollection<Document> topologies;
@Inject
public MongoMetadataDaoImpl(Config config) {
@@ -118,21 +126,46 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
publishmentType.createIndex(doc1, io1);
}
+ // below is for schedule_specs and its splitted collections
+ BsonDocument doc1 = new BsonDocument();
+ IndexOptions io1 = new IndexOptions().background(true).unique(true).name("versionIndex");
+ doc1.append("version", new BsonInt32(1));
scheduleStates = db.getCollection("schedule_specs");
+ scheduleStates.createIndex(doc1, io1);
+
+ spoutSpecs = db.getCollection("spoutSpecs");
{
- IndexOptions io1 = new IndexOptions().background(true).unique(true).name("nameIndex");
- BsonDocument doc1 = new BsonDocument();
- doc1.append("version", new BsonInt32(1));
- scheduleStates.createIndex(doc1, io1);
+ IndexOptions io_internal = new IndexOptions().background(true).unique(true).name("topologyIdIndex");
+ BsonDocument doc_internal = new BsonDocument();
+ doc_internal.append("topologyId", new BsonInt32(1));
+ spoutSpecs.createIndex(doc_internal, io_internal);
}
- assignments = db.getCollection("assignments");
+ alertSpecs = db.getCollection("alertSpecs");
{
- IndexOptions io1 = new IndexOptions().background(true).unique(true).name("policyNameIndex");
- BsonDocument doc1 = new BsonDocument();
- doc1.append("policyName", new BsonInt32(1));
- assignments.createIndex(doc1, io1);
+ IndexOptions io_internal = new IndexOptions().background(true).unique(true).name("topologyNameIndex");
+ BsonDocument doc_internal = new BsonDocument();
+ doc_internal.append("topologyName", new BsonInt32(1));
+ alertSpecs.createIndex(doc_internal, io_internal);
}
+
+ groupSpecs = db.getCollection("groupSpecs");
+ groupSpecs.createIndex(doc1, io1);
+
+ publishSpecs = db.getCollection("publishSpecs");
+ publishSpecs.createIndex(doc1, io1);
+
+ policySnapshots = db.getCollection("policySnapshots");
+ policySnapshots.createIndex(doc1, io);
+
+ streamSnapshots = db.getCollection("streamSnapshots");
+ streamSnapshots.createIndex(doc1, io);
+
+ monitoredStreams = db.getCollection("monitoredStreams");
+ monitoredStreams.createIndex(doc1, io);
+
+ assignments = db.getCollection("assignments");
+ assignments.createIndex(doc1, io1);
}
@Override
@@ -285,6 +318,21 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
return remove(publishmentType, pubType);
}
+ private <T> OpResult addOne(MongoCollection<Document> collection, T t) {
+ OpResult result = new OpResult();
+ try {
+ String json = mapper.writeValueAsString(t);
+ collection.insertOne(Document.parse(json));
+ result.code = 200;
+ result.message = String.format("add one document to collection %s succeed!", collection.getNamespace());
+ } catch (Exception e) {
+ result.code = 400;
+ result.message = e.getMessage();
+ LOG.error("", e);
+ }
+ return result;
+ }
+
@Override
public ScheduleState getScheduleState(String versionId) {
BsonDocument doc = new BsonDocument();
@@ -301,29 +349,21 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
return null;
}
}).first();
- return state;
- }
-
- @Override
- public OpResult addScheduleState(ScheduleState state) {
- return addOne(scheduleStates, state);
- }
- private <T> OpResult addOne(MongoCollection<Document> collection, T t) {
- OpResult result = new OpResult();
- try {
- String json = mapper.writeValueAsString(t);
- collection.insertOne(Document.parse(json));
- result.code = 200;
- result.message = "add state succeed!";
- } catch (Exception e) {
- result.code = 400;
- result.message = e.getMessage();
- LOG.error("", e);
+ if (state != null){
+ // based on version, to add content from collections of spoutSpecs/alertSpecs/etc..
+ state = addDetailForScheduleState(state, versionId);
}
- return result;
+
+ return state;
}
+ /***
+ * get the basic ScheduleState, and then based on the version to get all sub-part(spoutSpecs/alertSpecs/etc)
+ * to form a completed ScheduleState.
+ *
+ * @return the latest ScheduleState
+ */
@Override
public ScheduleState getScheduleState() {
BsonDocument sort = new BsonDocument();
@@ -340,9 +380,163 @@ public class MongoMetadataDaoImpl implements IMetadataDao {
return null;
}
}).first();
+
+ if (state != null){
+ String version = state.getVersion();
+ // based on version, to add content from collections of spoutSpecs/alertSpecs/etc..
+ state = addDetailForScheduleState(state, version);
+ }
+
+ return state;
+ }
+
+ private ScheduleState addDetailForScheduleState(ScheduleState state, String version){
+ Map<String, SpoutSpec> spoutMaps = maps(spoutSpecs, SpoutSpec.class, version);
+ if (spoutMaps.size() !=0){
+ state.setSpoutSpecs(spoutMaps);
+ }
+
+ Map<String, AlertBoltSpec> alertMaps = maps(alertSpecs, AlertBoltSpec.class, version);
+ if (alertMaps.size() !=0){
+ state.setAlertSpecs(alertMaps);
+ }
+
+ Map<String, RouterSpec> groupMaps = maps(groupSpecs, RouterSpec.class, version);
+ if (groupMaps.size() !=0){
+ state.setGroupSpecs(groupMaps);
+ }
+
+ Map<String, PublishSpec> publishMaps = maps(publishSpecs, PublishSpec.class, version);
+ if (publishMaps.size() !=0){
+ state.setPublishSpecs(publishMaps);
+ }
+
+ List<VersionedPolicyDefinition> policyLists = list(policySnapshots, VersionedPolicyDefinition.class, version);
+ if (policyLists.size() !=0){
+ state.setPolicySnapshots(policyLists);
+ }
+
+ List<VersionedStreamDefinition> streamLists = list(streamSnapshots, VersionedStreamDefinition.class, version);
+ if (streamLists.size() !=0){
+ state.setStreamSnapshots(streamLists);
+ }
+
+ List<MonitoredStream> monitorLists = list(monitoredStreams, MonitoredStream.class, version);
+ if (monitorLists.size() !=0){
+ state.setMonitoredStreams(monitorLists);
+ }
+
+ List<PolicyAssignment> assignmentLists = list(assignments, PolicyAssignment.class, version);
+ if (assignmentLists.size() !=0){
+ state.setAssignments(assignmentLists);
+ }
return state;
}
+ private <T> Map<String, T> maps(MongoCollection<Document> collection, Class<T> clz, String version){
+ BsonDocument doc = new BsonDocument();
+ doc.append("version", new BsonString(version));
+
+ Map<String, T> maps = new HashMap<String, T>();
+ String mapKey = (clz == SpoutSpec.class)? "topologyId" : "topologyName";
+ collection.find(doc).forEach(new Block<Document>() {
+ @Override
+ public void apply(Document document) {
+ String json = document.toJson();
+ try {
+ maps.put(document.getString(mapKey), mapper.readValue(json, clz));
+ } catch (IOException e) {
+ LOG.error("deserialize config item failed!", e);
+ }
+ }
+ });
+
+ return maps;
+ }
+
+ private <T> List<T> list(MongoCollection<Document> collection, Class<T> clz, String version){
+ BsonDocument doc = new BsonDocument();
+ doc.append("version", new BsonString(version));
+
+ List<T> result = new LinkedList<T>();
+ collection.find(doc).map(new Function<Document, T>() {
+ @Override
+ public T apply(Document t) {
+ String json = t.toJson();
+ try {
+ return mapper.readValue(json, clz);
+ } catch (IOException e) {
+ LOG.error("deserialize config item failed!", e);
+ }
+ return null;
+ }
+ }).into(result);
+ return result;
+ }
+
+ /***
+ * write ScheduleState to several collections. basic info writes to ScheduleState, other writes to collections
+ * named by spoutSpecs/alertSpecs/etc.
+ *
+ * @param state
+ * @return
+ */
+ @Override
+ public OpResult addScheduleState(ScheduleState state) {
+ OpResult result = new OpResult();
+ try {
+ for (String key: state.getSpoutSpecs().keySet()){
+ SpoutSpec spoutSpec = state.getSpoutSpecs().get(key);
+ addOne(spoutSpecs, spoutSpec);
+ }
+
+ for (String key: state.getAlertSpecs().keySet()){
+ AlertBoltSpec alertBoltSpec = state.getAlertSpecs().get(key);
+ addOne(alertSpecs, alertBoltSpec);
+ }
+
+ for (String key: state.getGroupSpecs().keySet()){
+ RouterSpec groupSpec = state.getGroupSpecs().get(key);
+ addOne(groupSpecs, groupSpec);
+ }
+
+ for (String key: state.getPublishSpecs().keySet()){
+ PublishSpec publishSpec = state.getPublishSpecs().get(key);
+ addOne(publishSpecs, publishSpec);
+ }
+
+ for (VersionedPolicyDefinition policySnapshot: state.getPolicySnapshots()){
+ addOne(policySnapshots, policySnapshot);
+ }
+
+ for (VersionedStreamDefinition streamSnapshot: state.getStreamSnapshots()){
+ addOne(streamSnapshots, streamSnapshot);
+ }
+
+ for (MonitoredStream monitoredStream: state.getMonitoredStreams()){
+ addOne(monitoredStreams, monitoredStream);
+ }
+
+ for (PolicyAssignment assignment: state.getAssignments()){
+ addOne(assignments, assignment);
+ }
+
+ ScheduleStateBase stateBase = new ScheduleStateBase(
+ state.getVersion(), state.getGenerateTime(), state.getCode(),
+ state.getMessage(), state.getScheduleTimeMillis());
+
+ addOne(scheduleStates, stateBase);
+
+ result.code = 200;
+ result.message = "add document to collection schedule_specs succeed";
+ } catch (Exception e) {
+ result.code = 400;
+ result.message = e.getMessage();
+ LOG.error("", e);
+ }
+ return result;
+ }
+
@Override
public List<PolicyAssignment> listAssignments() {
return list(assignments, PolicyAssignment.class);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/678437aa/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
index 5b38776..a48ee7d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
@@ -16,17 +16,21 @@
*/
package org.apache.eagle.service.alert.resource.impl;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodProcess;
+import de.flapdoodle.embed.mongo.MongodStarter;
+import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
+import de.flapdoodle.embed.mongo.config.Net;
+import de.flapdoodle.embed.mongo.distribution.Version;
+import de.flapdoodle.embed.process.runtime.Network;
+import org.apache.eagle.alert.coordination.model.*;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.engine.coordinator.*;
import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.alert.metadata.impl.MongoMetadataDaoImpl;
import org.apache.eagle.alert.metadata.resource.OpResult;
@@ -37,16 +41,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import de.flapdoodle.embed.mongo.MongodExecutable;
-import de.flapdoodle.embed.mongo.MongodProcess;
-import de.flapdoodle.embed.mongo.MongodStarter;
-import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
-import de.flapdoodle.embed.mongo.config.Net;
-import de.flapdoodle.embed.mongo.distribution.Version;
-import de.flapdoodle.embed.process.runtime.Network;
+import java.util.*;
/**
* @since May 1, 2016
@@ -206,4 +201,92 @@ public class MongoImplTest {
System.out.println(state.getVersion());
System.out.println(state.getGenerateTime());
}
+
+ private void test_addCompleteScheduleState() {
+ Long timestamp = System.currentTimeMillis();
+ String version = "state-" + timestamp;
+
+ // SpoutSpec
+ Map<String, SpoutSpec> spoutSpecsMap = new HashMap<>();
+ SpoutSpec spoutSpec1 = new SpoutSpec();
+ String topologyId1 = "testUnitTopology1_" + timestamp;
+ spoutSpec1.setTopologyId(topologyId1);
+ spoutSpecsMap.put(topologyId1, spoutSpec1);
+
+ SpoutSpec spoutSpec2 = new SpoutSpec();
+ String topologyId2 = "testUnitTopology2_" + timestamp;
+ spoutSpec2.setTopologyId(topologyId2);
+ spoutSpecsMap.put(topologyId2, spoutSpec2);
+
+ // Alert Spec
+ Map<String, AlertBoltSpec> alertSpecsMap = new HashMap<>();
+ alertSpecsMap.put(topologyId1, new AlertBoltSpec(topologyId1));
+
+ // GroupSpec
+ Map<String, RouterSpec> groupSpecsMap = new HashMap<>();
+ groupSpecsMap.put(topologyId1, new RouterSpec(topologyId1));
+
+ // PublishSpec
+ Map<String, PublishSpec> pubMap = new HashMap<>();
+ pubMap.put(topologyId1, new PublishSpec(topologyId1, "testPublishBolt"));
+
+ // Policy Snapshots
+ Collection<PolicyDefinition> policySnapshots = new ArrayList<>();
+ PolicyDefinition policy = new PolicyDefinition();
+ policy.setName("testPolicyDefinition");
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ def.setValue("1,jobID,job1,daily_rule,14:00:00,15:00:00");
+ def.setType("absencealert");
+ policy.setDefinition(def);
+ policySnapshots.add(policy);
+
+ // Stream Snapshots
+ Collection<StreamDefinition> streams = new ArrayList<>();
+ StreamDefinition stream = new StreamDefinition();
+ stream.setStreamId("testStream");
+ streams.add(stream);
+
+ // Monitored Streams
+ Collection<MonitoredStream> monitoredStreams = new ArrayList<>();
+ StreamPartition partition = new StreamPartition();
+ partition.setType(StreamPartition.Type.GLOBAL);
+ partition.setStreamId("s1");
+ partition.setColumns(Arrays.asList("f1", "f2"));
+ StreamGroup sg = new StreamGroup();
+ sg.addStreamPartition(partition);
+ MonitoredStream monitoredStream = new MonitoredStream(sg);
+ monitoredStreams.add(monitoredStream);
+
+ // Assignments
+ Collection<PolicyAssignment> assignments = new ArrayList<>();
+ assignments.add(new PolicyAssignment("syslog_regex", "SG[syslog_stream-]"+timestamp));
+
+ ScheduleState state = new ScheduleState(version, spoutSpecsMap, groupSpecsMap, alertSpecsMap, pubMap,
+ assignments, monitoredStreams, policySnapshots, streams);
+
+ OpResult result = dao.addScheduleState(state);
+ Assert.assertEquals(200, result.code);
+ }
+
+ @Test
+ public void test_readCompleteScheduleState() {
+ test_addCompleteScheduleState();
+
+ ScheduleState state = dao.getScheduleState();
+ Assert.assertNotNull(state);
+ Assert.assertEquals(2, state.getSpoutSpecs().size());
+ Assert.assertEquals(1, state.getAlertSpecs().size());
+ Assert.assertEquals(1, state.getGroupSpecs().size());
+ Assert.assertEquals(1, state.getPublishSpecs().size());
+ Assert.assertEquals(1, state.getPolicySnapshots().size());
+ Assert.assertEquals(1, state.getStreamSnapshots().size());
+ Assert.assertEquals(1, state.getMonitoredStreams().size());
+ Assert.assertEquals(1, state.getAssignments().size());
+
+
+ System.out.println(state.getVersion());
+ System.out.println(state.getGenerateTime());
+
+
+ }
}
[2/2] incubator-eagle git commit: EAGLE-440: Alert mongodb storage
refine Split schedule specs into different mongo collections to avoid too big
mongo document
Posted by ra...@apache.org.
EAGLE-440: Alert mongodb storage refine
Split schedule specs into different mongo collections to avoid too big mongo document
Author: Zeng, Bryant
Committer: ralphsu
This closes #328
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/a772a055
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/a772a055
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/a772a055
Branch: refs/heads/develop
Commit: a772a05568ac30fcfbff21cfaf94e72022b6fe68
Parents: 08abde5 678437a
Author: Ralph, Su <su...@gmail.com>
Authored: Thu Aug 11 14:54:00 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Thu Aug 11 14:54:00 2016 +0800
----------------------------------------------------------------------
.../model/internal/ScheduleStateBase.java | 86 ++++++
.../metadata/impl/MongoMetadataDaoImpl.java | 298 +++++++++++++++----
.../alert/resource/impl/MongoImplTest.java | 121 ++++++--
3 files changed, 434 insertions(+), 71 deletions(-)
----------------------------------------------------------------------