You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/06/01 05:56:30 UTC
[15/19] incubator-eagle git commit: EAGLE-324: Init branch-v0.5
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
new file mode 100644
index 0000000..e755237
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.impl.strategies;
+
+import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.CoordinatorConstants;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.TopologyMgmtService;
+import org.apache.eagle.alert.coordinator.TopologyMgmtService.TopologyMeta;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * A simple strategy that only find the bolts in the same topology as the
+ * required work slots.
+ *
+ * Invariant:<br/>
+ * One slot queue only on the one topology.<br/>
+ * One topology doesn't contains two same partition slot queues.
+ *
+ * @since Apr 27, 2016
+ *
+ */
+public class SameTopologySlotStrategy implements IWorkSlotStrategy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SameTopologySlotStrategy.class);
+
+ private final IScheduleContext context;
+ private final StreamGroup partitionGroup;
+ private final TopologyMgmtService mgmtService;
+
+// private final int numOfPoliciesBoundPerBolt;
+ private final double topoLoadUpbound;
+
+ public SameTopologySlotStrategy(IScheduleContext context, StreamGroup streamPartitionGroup,
+ TopologyMgmtService mgmtService) {
+ this.context = context;
+ this.partitionGroup = streamPartitionGroup;
+ this.mgmtService = mgmtService;
+
+ Config config = ConfigFactory.load().getConfig(CoordinatorConstants.CONFIG_ITEM_COORDINATOR);
+// numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT);
+ topoLoadUpbound = config.getDouble(CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND);
+ }
+
+ /**
+ * @param isDedicated
+ * - not used yet!
+ */
+ @Override
+ public List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, Object> properties) {
+ Iterator<Topology> it = context.getTopologies().values().stream().filter((t) -> t.getNumOfAlertBolt() >= size)
+ .iterator();
+ // priority strategy first???
+ List<WorkSlot> slots = new ArrayList<WorkSlot>();
+ while (it.hasNext()) {
+ Topology t = it.next();
+ if (getQueueOnTopology(size, slots, t)) {
+ break;
+ }
+ }
+
+ if (slots.size() == 0) {
+ int supportedSize = mgmtService.getNumberOfAlertBoltsInTopology();
+ if (size > supportedSize) {
+ LOG.error("can not find available slots for queue, required size {}, supported size {} !", size, supportedSize);
+ return Collections.emptyList();
+ }
+ TopologyMeta topoMeta = mgmtService.creatTopology();
+ if (topoMeta == null) {
+ LOG.error("can not create topology for given queue requirement, required size {}, requried partition group {} !", size, partitionGroup);
+ return Collections.emptyList();
+ }
+
+ context.getTopologies().put(topoMeta.topologyId, topoMeta.topology);
+ context.getTopologyUsages().put(topoMeta.topologyId, topoMeta.usage);
+ boolean placed = getQueueOnTopology(size, slots, topoMeta.topology);
+ if (!placed) {
+ LOG.error("can not find available slots from new created topology, required size {}. This indicates an error !", size);
+ }
+ }
+ return slots;
+ }
+
+ private boolean getQueueOnTopology(int size, List<WorkSlot> slots, Topology t) {
+ TopologyUsage u = context.getTopologyUsages().get(t.getName());
+ if (!isTopologyAvailable(u)) {
+ return false;
+ }
+
+ List<String> bolts = new ArrayList<String>();
+ for (AlertBoltUsage alertUsage : u.getAlertUsages().values()) {
+ if (isBoltAvailable(alertUsage)) {
+ bolts.add(alertUsage.getBoltId());
+ }
+
+ if (bolts.size() == size) {
+ break;
+ }
+ }
+
+ if (bolts.size() == size) {
+ for (String boltId : bolts) {
+ WorkSlot slot = new WorkSlot(t.getName(), boltId);
+ slots.add(slot);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isTopologyAvailable(TopologyUsage u) {
+// for (MonitoredStream stream : u.getMonitoredStream()) {
+// if (partition.equals(stream.getStreamParitition())) {
+// return false;
+// }
+// }
+ if (u == null || u.getLoad() > topoLoadUpbound) {
+ return false;
+ }
+
+ return true;
+ }
+
+ private boolean isBoltAvailable(AlertBoltUsage alertUsage) {
+ // FIXME : more detail to compare on queue exclusion check
+ if (alertUsage.getQueueSize() > 0) {
+ return false;
+ }
+ // actually it's now 0;
+ return true;
+// return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
new file mode 100644
index 0000000..e9148f5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+/**
+ * @since Mar 28, 2016
+ *
+ */
+public class AlertBoltUsage {
+
+ private String boltId;
+ private List<String> policies = new ArrayList<String>();
+ // the stream partitions group that scheduled for this given alert bolt
+ private List<StreamGroup> partitions = new ArrayList<StreamGroup>();
+ // the slot queue that scheduled for this given aler bolt
+ private List<StreamWorkSlotQueue> referQueues = new ArrayList<StreamWorkSlotQueue>();
+ private double load;
+
+ public AlertBoltUsage(String anid) {
+ this.boltId = anid;
+ }
+
+ public String getBoltId() {
+ return boltId;
+ }
+
+ public void setBoltId(String boltId) {
+ this.boltId = boltId;
+ }
+
+ public List<String> getPolicies() {
+ return policies;
+ }
+
+ public void addPolicies(PolicyDefinition pd) {
+ policies.add(pd.getName());
+ // add first partition
+// for (StreamPartition par : pd.getPartitionSpec()) {
+// partitions.add(par);
+// }
+ }
+
+ public double getLoad() {
+ return load;
+ }
+
+ public void setLoad(double load) {
+ this.load = load;
+ }
+
+ public List<StreamGroup> getPartitions() {
+ return partitions;
+ }
+
+ public List<StreamWorkSlotQueue> getReferQueues() {
+ return referQueues;
+ }
+
+ public int getQueueSize() {
+ return referQueues.size();
+ }
+
+ public void addQueue(StreamGroup streamPartition, StreamWorkSlotQueue queue) {
+ this.referQueues.add(queue);
+ this.partitions.add(streamPartition);
+ }
+
+ public void removeQueue(StreamWorkSlotQueue queue) {
+ this.referQueues.remove(queue);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
new file mode 100644
index 0000000..86238d1
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.model;
+
+
+/**
+ * @since Mar 28, 2016
+ *
+ */
+public class GroupBoltUsage {
+
+ private String boltId;
+ private double load;
+
+ public GroupBoltUsage(String boltId) {
+ this.boltId = boltId;
+ }
+
+// private final Set<String> streams = new HashSet<String>();
+// private final Map<String, StreamFilter> filters = new HashMap<String, StreamFilter>();
+
+// private final Map<String, List<StreamPartition>> groupByMeta;
+
+ public double getLoad() {
+ return load;
+ }
+
+ public void setLoad(double load) {
+ this.load = load;
+ }
+
+// public Set<String> getStreams() {
+// return streams;
+// }
+//
+//
+// public Map<String, StreamFilter> getFilters() {
+// return filters;
+// }
+
+// public Map<String, List<StreamPartition>> getGroupByMeta() {
+// return groupByMeta;
+// }
+
+ public String getBoltId() {
+ return boltId;
+ }
+
+ public void setBoltId(String boltId) {
+ this.boltId = boltId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
new file mode 100644
index 0000000..6eb6195
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.model;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+
+/**
+ * @since Mar 27, 2016
+ *
+ */
+public class TopologyUsage {
+ // topo info
+ private String topoName;
+ private final Set<String> datasources = new HashSet<String>();
+ // usage information
+ private final Set<String> policies = new HashSet<String>();
+ private final Map<String, AlertBoltUsage> alertUsages = new HashMap<String, AlertBoltUsage>();
+ private final Map<String, GroupBoltUsage> groupUsages = new HashMap<String, GroupBoltUsage>();
+ private final List<MonitoredStream> monitoredStream = new ArrayList<MonitoredStream>();
+
+ private double load;
+
+ /**
+ * This is to be the existing/previous meta-data. <br/>
+ * Only one group meta-data for all of the group bolts in this topology.
+ */
+
+ public TopologyUsage() {
+ }
+
+ public TopologyUsage(String name) {
+ this.topoName = name;
+ }
+
+ public String getTopoName() {
+ return topoName;
+ }
+
+ public void setTopoName(String topoId) {
+ this.topoName = topoId;
+ }
+
+ public Set<String> getDataSources() {
+ return datasources;
+ }
+
+ public Set<String> getPolicies() {
+ return policies;
+ }
+
+ public Map<String, AlertBoltUsage> getAlertUsages() {
+ return alertUsages;
+ }
+
+ public AlertBoltUsage getAlertBoltUsage(String boltId) {
+ return alertUsages.get(boltId);
+ }
+
+ public Map<String, GroupBoltUsage> getGroupUsages() {
+ return groupUsages;
+ }
+
+ public List<MonitoredStream> getMonitoredStream() {
+ return monitoredStream;
+ }
+
+ public void addMonitoredStream(MonitoredStream par) {
+ if (!this.monitoredStream.contains(par)) {
+ this.monitoredStream.add(par);
+ }
+ }
+
+ public double getLoad() {
+ return load;
+ }
+
+ public void setLoad(double load) {
+ this.load = load;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
new file mode 100644
index 0000000..84a4061
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.provider;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * @since Mar 28, 2016
+ *
+ */
+public class InMemScheduleConext implements IScheduleContext {
+
+ private Map<String, Topology> topologies = new HashMap<String, Topology>();
+ private Map<String, TopologyUsage> usages = new HashMap<String, TopologyUsage>();
+ private Map<String, PolicyDefinition> policies = new HashMap<String, PolicyDefinition>();
+ private Map<String, Kafka2TupleMetadata> datasources = new HashMap<String, Kafka2TupleMetadata>();
+ private Map<String, PolicyAssignment> policyAssignments = new HashMap<String, PolicyAssignment>();
+ private Map<String, StreamDefinition> schemas = new HashMap<String, StreamDefinition>();
+ private Map<StreamGroup, MonitoredStream> monitoredStreams = new HashMap<StreamGroup, MonitoredStream>();
+ private Map<String, Publishment> publishments = new HashMap<String, Publishment>();
+
+ public InMemScheduleConext() {
+ }
+
+ public InMemScheduleConext(IScheduleContext context) {
+ this.topologies = new HashMap<String, Topology>(context.getTopologies());
+ this.usages = new HashMap<String, TopologyUsage>(context.getTopologyUsages());
+ this.policies = new HashMap<String, PolicyDefinition>(context.getPolicies());
+ this.datasources = new HashMap<String, Kafka2TupleMetadata>(context.getDataSourceMetadata());
+ this.policyAssignments = new HashMap<String, PolicyAssignment>(context.getPolicyAssignments());
+ this.schemas = new HashMap<String, StreamDefinition>(context.getStreamSchemas());
+ this.monitoredStreams = new HashMap<StreamGroup, MonitoredStream>(context.getMonitoredStreams());
+ this.publishments = new HashMap<String, Publishment>(context.getPublishments());
+ }
+
+ public InMemScheduleConext(Map<String, Topology> topologies2, Map<String, PolicyAssignment> assignments,
+ Map<String, Kafka2TupleMetadata> kafkaSources, Map<String, PolicyDefinition> policies2,
+ Map<String, Publishment> publishments2, Map<String, StreamDefinition> streamDefinitions,
+ Map<StreamGroup, MonitoredStream> monitoredStreamMap, Map<String, TopologyUsage> usages2) {
+ this.topologies = topologies2;
+ this.policyAssignments = assignments;
+ this.datasources = kafkaSources;
+ this.policies = policies2;
+ this.publishments = publishments2;
+ this.schemas = streamDefinitions;
+ this.monitoredStreams = monitoredStreamMap;
+ this.usages = usages2;
+ }
+
+ public Map<String, Topology> getTopologies() {
+ return topologies;
+ }
+
+ public void addTopology(Topology topo) {
+ topologies.put(topo.getName(), topo);
+ }
+
+ public Map<String, TopologyUsage> getTopologyUsages() {
+ return usages;
+ }
+
+ public void addTopologyUsages(TopologyUsage usage) {
+ usages.put(usage.getTopoName(), usage);
+ }
+
+ public Map<String, PolicyDefinition> getPolicies() {
+ return policies;
+ }
+
+ public void addPoilcy(PolicyDefinition pd) {
+ this.policies.put(pd.getName(), pd);
+ }
+
+ public Map<String, Kafka2TupleMetadata> getDatasources() {
+ return datasources;
+ }
+
+ public void setDatasources(Map<String, Kafka2TupleMetadata> datasources) {
+ this.datasources = datasources;
+ }
+
+ public void addDataSource(Kafka2TupleMetadata dataSource) {
+ this.datasources.put(dataSource.getName(), dataSource);
+ }
+
+ @Override
+ public Map<String, Kafka2TupleMetadata> getDataSourceMetadata() {
+ return datasources;
+ }
+
+ public void setPolicyOrderedTopologies(Map<String, PolicyAssignment> policyAssignments) {
+ this.policyAssignments = policyAssignments;
+ }
+
+ public Map<String, PolicyAssignment> getPolicyAssignments() {
+ return this.policyAssignments;
+ }
+
+ @Override
+ public Map<String, StreamDefinition> getStreamSchemas() {
+ return schemas;
+ }
+
+ public void addSchema(StreamDefinition schema) {
+ this.schemas.put(schema.getStreamId(), schema);
+ }
+
+ public void setStreamSchemas(Map<String, StreamDefinition> schemas) {
+ this.schemas = schemas;
+ }
+
+ @Override
+ public Map<StreamGroup, MonitoredStream> getMonitoredStreams() {
+ return monitoredStreams;
+ }
+
+ @Override
+ public Map<String, Publishment> getPublishments() {
+ return publishments;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
new file mode 100644
index 0000000..d4d6c0c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.provider;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.GroupBoltUsage;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+/**
+ * FIXME: this class focus on correctness, not the efficiency now. There might
+ * be problem when metadata size grows too big.
+ *
+ * @since May 3, 2016
+ *
+ */
+public class ScheduleContextBuilder {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ScheduleContextBuilder.class);
+ private static final String UNIQUE_BOLT_ID = "%s-%s";// toponame-boltname
+
+ private IMetadataServiceClient client;
+
+ private Map<String, Topology> topologies;
+ private Map<String, PolicyAssignment> assignments;
+ private Map<String, Kafka2TupleMetadata> kafkaSources;
+ private Map<String, PolicyDefinition> policies;
+ private Map<String, Publishment> publishments;
+ private Map<String, StreamDefinition> streamDefinitions;
+ private Map<StreamGroup, MonitoredStream> monitoredStreamMap;
+ private Map<String, TopologyUsage> usages;
+
+ public ScheduleContextBuilder(Config config) {
+ client = new MetadataServiceClientImpl(config);
+ }
+
+ public ScheduleContextBuilder(IMetadataServiceClient client) {
+ this.client = client;
+ }
+
+ /**
+ * Built a shcedule context for metadata client service.
+ *
+ * @return
+ */
+ public IScheduleContext buildContext() {
+ topologies = listToMap(client.listTopologies());
+ kafkaSources = listToMap(client.listDataSources());
+ policies = listToMap(client.listPolicies());
+ publishments = listToMap(client.listPublishment());
+ streamDefinitions = listToMap(client.listStreams());
+
+ // TODO: See ScheduleState comments on how to improve the storage
+ ScheduleState state = client.getVersionedSpec();
+ assignments = listToMap(state == null ? new ArrayList<PolicyAssignment>() : cleanupDeprecatedAssignments(state.getAssignments()));
+
+ monitoredStreamMap = listToMap(state == null ? new ArrayList<MonitoredStream>() : cleanupDeprecatedStreamsAndAssignment(state.getMonitoredStreams()));
+
+ // build based on existing data
+ usages = buildTopologyUsage();
+
+ // copy to shedule context now
+ return new InMemScheduleConext(topologies, assignments, kafkaSources, policies, publishments,
+ streamDefinitions, monitoredStreamMap, usages);
+ }
+
+ /**
+ * 1.
+ * <pre>
+ * Check for deprecated policy stream group with its assigned monitored stream groups.
+ * If this is unmatched, we think the policy' stream group has been changed, remove the policy assignments
+ * If finally, no assignment refer to a given monitored stream, this monitored stream could be removed.
+ * Log when every time a remove happens.
+ * </pre>
+ * 2.
+ * <pre>
+ * if monitored stream's queue's is on non-existing topology, remove it.
+ * </pre>
+ * @param monitoredStreams
+ * @return
+ */
+ private List<MonitoredStream> cleanupDeprecatedStreamsAndAssignment(List<MonitoredStream> monitoredStreams) {
+ List<MonitoredStream> result = new ArrayList<MonitoredStream>(monitoredStreams);
+
+ // clear deprecated streams
+ clearMonitoredStreams(monitoredStreams);
+
+ // build queueId-> streamGroup
+ Map<String, StreamGroup> queue2StreamGroup = new HashMap<String, StreamGroup>();
+ for (MonitoredStream ms : result) {
+ for (StreamWorkSlotQueue q : ms.getQueues()) {
+ queue2StreamGroup.put(q.getQueueId(), ms.getStreamGroup());
+ }
+ }
+
+ // decide the assignment delete set
+ Set<StreamGroup> usedGroups = new HashSet<StreamGroup>();
+ Set<String> toRemove = new HashSet<String>();
+ // check if queue is still referenced by policy assignments
+ for (PolicyAssignment assignment : assignments.values()) {
+ PolicyDefinition def = policies.get(assignment.getPolicyName());
+ StreamGroup group = queue2StreamGroup.get(assignment.getQueueId());
+ if (group == null || !Objects.equals(group.getStreamPartitions(), def.getPartitionSpec())) {
+ LOG.warn(" policy assgiment {} 's policy group {} is different to the monitored stream's partition group {}, "
+ + "this indicates a policy stream partition spec change, the assignment would be removed! ",
+ assignment, def.getPartitionSpec(), group == null ? "'not found'" :group.getStreamPartitions());
+ toRemove.add(assignment.getPolicyName());
+ } else {
+ usedGroups.add(group);
+ }
+ }
+
+ // remove useless
+ assignments.keySet().removeAll(toRemove);
+ // remove non-referenced monitored streams
+ result.removeIf((t) -> {
+ boolean used = usedGroups.contains(t.getStreamGroup());
+ if (!used) {
+ LOG.warn("monitor stream with stream group {} is not referenced, "
+ + "this monitored stream and its worker queu will be removed", t.getStreamGroup());
+ }
+ return !used;
+ });
+
+ return result;
+ }
+
+ private void clearMonitoredStreams(List<MonitoredStream> monitoredStreams) {
+ Iterator<MonitoredStream> it = monitoredStreams.iterator();
+ while (it.hasNext()) {
+ MonitoredStream ms = it.next();
+ Iterator<StreamWorkSlotQueue> queueIt = ms.getQueues().iterator();
+ // clean queue that underly topology is changed(removed/down)
+ while (queueIt.hasNext()) {
+ StreamWorkSlotQueue queue = queueIt.next();
+ boolean deprecated = false;
+ for (WorkSlot ws : queue.getWorkingSlots()) {
+ // check if topology available or bolt available
+ if (!topologies.containsKey(ws.topologyName)
+ || !topologies.get(ws.topologyName).getAlertBoltIds().contains(ws.boltId)) {
+ deprecated = true;
+ break;
+ }
+ }
+ if (deprecated) {
+ queueIt.remove();
+ }
+ }
+
+ if (ms.getQueues().isEmpty()) {
+ it.remove();
+ }
+ }
+ }
+
+ private List<PolicyAssignment> cleanupDeprecatedAssignments(List<PolicyAssignment> list) {
+ List<PolicyAssignment> result = new ArrayList<PolicyAssignment>(list);
+ Iterator<PolicyAssignment> paIt = result.iterator();
+ while (paIt.hasNext()) {
+ PolicyAssignment assignment = paIt.next();
+ if (!policies.containsKey(assignment.getPolicyName())) {
+ LOG.info("Policy assignment {} 's policy not found, this assignment will be removed!", assignment);
+ paIt.remove();
+ }
+ }
+ return result;
+ }
+
+ private <T, K> Map<K, T> listToMap(List<T> collections) {
+ Map<K, T> maps = new HashMap<K, T>(collections.size());
+ for (T t : collections) {
+ maps.put(getKey(t), t);
+ }
+ return maps;
+ }
+
+ /*
+ * One drawback, once we add class, this code need to be changed!
+ */
+ @SuppressWarnings("unchecked")
+ private <T, K> K getKey(T t) {
+ if (t instanceof Topology) {
+ return (K) ((Topology) t).getName();
+ } else if (t instanceof PolicyAssignment) {
+ return (K) ((PolicyAssignment) t).getPolicyName();
+ } else if (t instanceof Kafka2TupleMetadata) {
+ return (K) ((Kafka2TupleMetadata) t).getName();
+ } else if (t instanceof PolicyDefinition) {
+ return (K) ((PolicyDefinition) t).getName();
+ } else if (t instanceof Publishment) {
+ return (K) ((Publishment) t).getName();
+ } else if (t instanceof StreamDefinition) {
+ return (K) ((StreamDefinition) t).getStreamId();
+ } else if (t instanceof MonitoredStream) {
+ return (K) ((MonitoredStream) t).getStreamGroup();
+ }
+ throw new RuntimeException("unexpected key class " + t.getClass());
+ }
+
+ private Map<String, TopologyUsage> buildTopologyUsage() {
+ Map<String, TopologyUsage> usages = new HashMap<String, TopologyUsage>();
+
+ // pre-build data structure for help
+ Map<String, Set<MonitoredStream>> topo2MonitorStream = new HashMap<String, Set<MonitoredStream>>();
+ Map<String, Set<String>> topo2Policies = new HashMap<String, Set<String>>();
+ // simply assume no bolt with the same id
+ Map<String, Set<String>> bolt2Policies = new HashMap<String, Set<String>>();
+ // simply assume no bolt with the same id
+ Map<String, Set<StreamGroup>> bolt2Partition = new HashMap<String, Set<StreamGroup>>();
+ // simply assume no bolt with the same id
+ Map<String, Set<String>> bolt2QueueIds = new HashMap<String, Set<String>>();
+ Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>();
+
+ preBuildQueue2TopoMap(topo2MonitorStream, topo2Policies, bolt2Policies, bolt2Partition, bolt2QueueIds, queueMap);
+
+ for (Topology t : topologies.values()) {
+ TopologyUsage u = new TopologyUsage(t.getName());
+ // add group/bolt usages
+ for (String grpBolt : t.getGroupNodeIds()) {
+ GroupBoltUsage grpUsage = new GroupBoltUsage(grpBolt);
+ u.getGroupUsages().put(grpBolt, grpUsage);
+ }
+ for (String alertBolt : t.getAlertBoltIds()) {
+ String uniqueBoltId = String.format(UNIQUE_BOLT_ID, t.getName(), alertBolt);
+
+ AlertBoltUsage alertUsage = new AlertBoltUsage(alertBolt);
+ u.getAlertUsages().put(alertBolt, alertUsage);
+ // complete usage
+ addBoltUsageInfo(bolt2Policies, bolt2Partition, bolt2QueueIds, uniqueBoltId, alertUsage, queueMap);
+ }
+
+ // policy -- policy assignment
+ if (topo2Policies.containsKey(u.getTopoName())) {
+ u.getPolicies().addAll(topo2Policies.get(u.getTopoName()));
+ }
+
+ // data source
+ buildTopologyDataSource(u);
+
+ // topology usage monitored stream -- from monitored steams' queue slot item info
+ if (topo2MonitorStream.containsKey(u.getTopoName())) {
+ u.getMonitoredStream().addAll(topo2MonitorStream.get(u.getTopoName()));
+ }
+
+ usages.put(u.getTopoName(), u);
+ }
+
+ return usages;
+ }
+
+ private void addBoltUsageInfo(Map<String, Set<String>> bolt2Policies,
+ Map<String, Set<StreamGroup>> bolt2Partition, Map<String, Set<String>> bolt2QueueIds, String uniqueAlertBolt,
+ AlertBoltUsage alertUsage, Map<String, StreamWorkSlotQueue> queueMap) {
+ //
+ if (bolt2Policies.containsKey(uniqueAlertBolt)) {
+ alertUsage.getPolicies().addAll(bolt2Policies.get(uniqueAlertBolt));
+ }
+ //
+ if (bolt2Partition.containsKey(uniqueAlertBolt)) {
+ alertUsage.getPartitions().addAll(bolt2Partition.get(uniqueAlertBolt));
+ }
+ //
+ if (bolt2QueueIds.containsKey(uniqueAlertBolt)) {
+ for (String qId : bolt2QueueIds.get(uniqueAlertBolt)) {
+ if (queueMap.containsKey(qId)) {
+ alertUsage.getReferQueues().add(queueMap.get(qId));
+ } else {
+ LOG.error(" queue id {} not found in queue map !", qId);
+ }
+ }
+ }
+ }
+
+ private void buildTopologyDataSource(TopologyUsage u) {
+ for (String policyName : u.getPolicies()) {
+ PolicyDefinition def = policies.get(policyName);
+ if (def != null) {
+ u.getDataSources().addAll(findDatasource(def));
+ } else {
+ LOG.error(" policy not find {}, but reference in topology usage {} !", policyName, u.getTopoName());
+ }
+ }
+ }
+
+ private List<String> findDatasource(PolicyDefinition def) {
+ List<String> result = new ArrayList<String>();
+ List<String> inputStreams = def.getInputStreams();
+ for (String is : inputStreams) {
+ StreamDefinition ss = this.streamDefinitions.get(is);
+ if (ss == null) {
+ LOG.error("policy {} referenced stream definition {} not found in definiton !", def.getName(), is);
+ } else {
+ result.add(ss.getDataSource());
+ }
+ }
+ return result;
+ }
+
+ private void preBuildQueue2TopoMap(
+ Map<String, Set<MonitoredStream>> topo2MonitorStream,
+ Map<String, Set<String>> topo2Policies,
+ Map<String, Set<String>> bolt2Policies,
+ Map<String, Set<StreamGroup>> bolt2Partition,
+ Map<String, Set<String>> bolt2QueueIds,
+ Map<String, StreamWorkSlotQueue> queueMap) {
+ // pre-build structure
+ // why don't reuse the queue.getPolicies
+ Map<String, Set<String>> queue2Policies= new HashMap<String, Set<String>>();
+ for (PolicyAssignment pa : assignments.values()) {
+ if (!queue2Policies.containsKey(pa.getQueueId())) {
+ queue2Policies.put(pa.getQueueId(), new HashSet<String>());
+ }
+ queue2Policies.get(pa.getQueueId()).add(pa.getPolicyName());
+ }
+
+ for (MonitoredStream stream : monitoredStreamMap.values()) {
+ for (StreamWorkSlotQueue q : stream.getQueues()) {
+ queueMap.put(q.getQueueId(), q);
+ Set<String> policiesOnQ = queue2Policies.containsKey(q.getQueueId()) ? queue2Policies.get(q.getQueueId()) : new HashSet<String>();
+
+ for (WorkSlot slot : q.getWorkingSlots()) {
+ // topo2monitoredstream
+ if (!topo2MonitorStream.containsKey(slot.getTopologyName())) {
+ topo2MonitorStream.put(slot.getTopologyName(), new HashSet<MonitoredStream>());
+ }
+ topo2MonitorStream.get(slot.getTopologyName()).add(stream);
+
+ // topo2policy
+ if (!topo2Policies.containsKey(slot.getTopologyName())) {
+ topo2Policies.put(slot.getTopologyName(), new HashSet<String>());
+ }
+ topo2Policies.get(slot.getTopologyName()).addAll(policiesOnQ);
+
+ // bolt2Policy
+ if (!bolt2Policies.containsKey(getUniqueBoltId(slot))) {
+ bolt2Policies.put(getUniqueBoltId(slot), new HashSet<String>());
+ }
+ bolt2Policies.get(getUniqueBoltId(slot)).addAll(policiesOnQ);
+
+ // bolt2Queue
+ if (!bolt2QueueIds.containsKey(getUniqueBoltId(slot))) {
+ bolt2QueueIds.put(getUniqueBoltId(slot), new HashSet<String>());
+ }
+ bolt2QueueIds.get(getUniqueBoltId(slot)).add(q.getQueueId());
+
+ // bolt2Partition
+ if (!bolt2Partition.containsKey(getUniqueBoltId(slot))) {
+ bolt2Partition.put(getUniqueBoltId(slot), new HashSet<StreamGroup>());
+ }
+ bolt2Partition.get(getUniqueBoltId(slot)).add(stream.getStreamGroup());
+ }
+ }
+ }
+ }
+
+ private String getUniqueBoltId(WorkSlot slot) {
+ return String.format(UNIQUE_BOLT_ID, slot.getTopologyName(), slot.getBoltId());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
new file mode 100644
index 0000000..6b8495e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.resource;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordinator.Coordinator;
+import org.apache.eagle.alert.coordinator.ScheduleOption;
+import org.apache.eagle.alert.utils.JsonUtils;
+
+/**
+ * This is to provide API access even we don't have ZK as intermediate access.
+ * FIXME : more elogant status code
+ *
+ * @since Mar 24, 2016 <br/>
+ */
+@Path("/coordinator")
+@Produces({ "application/json" })
+public class CoordinatorResource {
+
+ // sprint config here?
+ private Coordinator alertCoordinator = new Coordinator();
+
+ @GET
+ @Path("/assignments")
+ public String getAssignments() throws Exception {
+ ScheduleState state = alertCoordinator.getState();
+ return JsonUtils.writeValueAsString(state);
+ }
+
+ @POST
+ @Path("/build")
+ public String build() throws Exception {
+ ScheduleOption option = new ScheduleOption();
+ ScheduleState state = alertCoordinator.schedule(option);
+ return JsonUtils.writeValueAsString(state);
+ }
+
+ /**
+ * Manually update the topology usages, for administration
+ *
+ * @return
+ */
+ @POST
+ @Path("/refreshUsages")
+ public String refreshUsages() {
+ // TODO
+ return "";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
new file mode 100644
index 0000000..d7881bd
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
@@ -0,0 +1,81 @@
+package org.apache.eagle.alert.coordinator.trigger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Poll policy change and notify listeners
+ */
+public class DynamicPolicyLoader implements Runnable{
+ private static Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class);
+
+ private IMetadataServiceClient client;
+ // initial cachedPolicies should be empty
+ private Map<String, PolicyDefinition> cachedPolicies = new HashMap<>();
+ private List<PolicyChangeListener> listeners = new ArrayList<>();
+
+ public DynamicPolicyLoader(IMetadataServiceClient client){
+ this.client = client;
+ }
+
+ public synchronized void addPolicyChangeListener(PolicyChangeListener listener){
+ listeners.add(listener);
+ }
+
+ /**
+ * When it is run at the first time, due to cachedPolicies being empty, all existing policies are expected
+ * to be addedPolicies
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ // we should catch every exception to avoid zombile thread
+ try {
+ List<PolicyDefinition> current = client.listPolicies();
+ Map<String, PolicyDefinition> currPolicies = new HashMap<>();
+ current.forEach(pe -> currPolicies.put(pe.getName(), pe));
+
+ Collection<String> addedPolicies = CollectionUtils.subtract(currPolicies.keySet(), cachedPolicies.keySet());
+ Collection<String> removedPolicies = CollectionUtils.subtract(cachedPolicies.keySet(), currPolicies.keySet());
+ Collection<String> potentiallyModifiedPolicies = CollectionUtils.intersection(currPolicies.keySet(), cachedPolicies.keySet());
+
+ List<String> reallyModifiedPolicies = new ArrayList<>();
+ for (String updatedPolicy : potentiallyModifiedPolicies) {
+ if (!currPolicies.get(updatedPolicy).equals(cachedPolicies.get(updatedPolicy))) {
+ reallyModifiedPolicies.add(updatedPolicy);
+ }
+ }
+
+ boolean policyChanged = false;
+ if (addedPolicies.size() != 0 ||
+ removedPolicies.size() != 0 ||
+ reallyModifiedPolicies.size() != 0) {
+ policyChanged = true;
+ }
+
+ if (!policyChanged) {
+ LOG.info("policy is not changed since last run");
+ return;
+ }
+ synchronized (this) {
+ for (PolicyChangeListener listener : listeners) {
+ listener.onPolicyChange(current, addedPolicies, removedPolicies, reallyModifiedPolicies);
+ }
+ }
+
+ // reset cached policies
+ cachedPolicies = currPolicies;
+ } catch (Throwable t) {
+ LOG.error("error loading policy, but continue to run", t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
new file mode 100644
index 0000000..8aa322f
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
@@ -0,0 +1,10 @@
+package org.apache.eagle.alert.coordinator.trigger;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+public interface PolicyChangeListener {
+ void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> addedPolicies, Collection<String> removedPolicies, Collection<String> modifiedPolicies);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf
new file mode 100644
index 0000000..51cc315
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf
@@ -0,0 +1,27 @@
+{
+ "coordinator" : {
+ "policiesPerBolt" : 5,
+ "boltParallelism" : 5,
+ "policyDefaultParallelism" : 5,
+ "boltLoadUpbound": 0.8,
+ "topologyLoadUpbound" : 0.8,
+ "numOfAlertBoltsPerTopology" : 5,
+ "zkConfig" : {
+ "zkQuorum" : "localhost:2181",
+ "zkRoot" : "/alert",
+ "zkSessionTimeoutMs" : 10000,
+ "connectionTimeoutMs" : 10000,
+ "zkRetryTimes" : 3,
+ "zkRetryInterval" : 3000
+ },
+ "metadataService" : {
+ "host" : "localhost",
+ "port" : 8080,
+ "context" : "/api"
+ },
+ "metadataDynamicCheck" : {
+ "initDelayMillis" : 1000,
+ "delayMillis" : 30000
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties
new file mode 100644
index 0000000..d4bc126
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml
new file mode 100644
index 0000000..1aa925e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<web-app xmlns="http://java.sun.com/xml/ns/javaee"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
+ http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
+ version="3.0">
+ <welcome-file-list>
+ <welcome-file>index.html</welcome-file>
+ </welcome-file-list>
+ <servlet>
+ <servlet-name>Jersey Web Application</servlet-name>
+ <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
+ <init-param>
+ <param-name>com.sun.jersey.config.property.packages</param-name>
+ <param-value>io.swagger.jaxrs.json,io.swagger.jaxrs.listing,org.apache.eagle.alert.coordinator.resource,org.codehaus.jackson.jaxrs</param-value>
+ </init-param>
+ <init-param>
+ <param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
+ <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter;com.sun.jersey.api.container.filter.PostReplaceFilter</param-value>
+ </init-param>
+ <init-param>
+ <param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name>
+ <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
+ </init-param>
+ <load-on-startup>1</load-on-startup>
+ </servlet>
+ <!-- Servlet for swagger initialization only, no URL mapping. -->
+ <servlet>
+ <servlet-name>swaggerConfig</servlet-name>
+ <servlet-class>io.swagger.jaxrs.config.DefaultJaxrsConfig</servlet-class>
+ <init-param>
+ <param-name>api.version</param-name>
+ <param-value>1.0.0</param-value>
+ </init-param>
+ <init-param>
+ <param-name>swagger.api.basepath</param-name>
+ <param-value>/api</param-value>
+ </init-param>
+ <load-on-startup>2</load-on-startup>
+ </servlet>
+
+ <servlet-mapping>
+ <servlet-name>Jersey Web Application</servlet-name>
+ <url-pattern>/api/*</url-pattern>
+ </servlet-mapping>
+ <filter>
+ <filter-name>CorsFilter</filter-name>
+ <!-- this should be replaced by tomcat ones, see also metadata resource -->
+ <filter-class>org.apache.eagle.alert.resource.SimpleCORSFiler</filter-class>
+ <init-param>
+ <param-name>cors.allowed.origins</param-name>
+ <param-value>*</param-value>
+ </init-param>
+ <init-param>
+ <param-name>cors.allowed.headers</param-name>
+ <param-value>Authorization,Origin, No-Cache, X-Requested-With, If-Modified-Since, Pragma, Last-Modified, Cache-Control, Expires, Content-Type, X-E4M-With, Accept</param-value>
+ </init-param>
+ <init-param>
+ <param-name>cors.allowed.methods</param-name>
+ <param-value>GET,POST,HEAD,OPTIONS,PUT,DELETE</param-value>
+ </init-param>
+ <init-param>
+ <param-name>cors.support.credentials</param-name>
+ <param-value>true</param-value>
+ </init-param>
+ </filter>
+ <filter-mapping>
+ <filter-name>CorsFilter</filter-name>
+ <url-pattern>/*</url-pattern>
+ </filter-mapping>
+</web-app>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html
new file mode 100644
index 0000000..1c4ea76
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html
@@ -0,0 +1,18 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+UMP Coordinator service!
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
new file mode 100644
index 0000000..78b72b3
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.eagle.alert.config.ConfigBusConsumer;
+import org.apache.eagle.alert.config.ConfigBusProducer;
+import org.apache.eagle.alert.config.ConfigChangeCallback;
+import org.apache.eagle.alert.config.ConfigValue;
+import org.apache.eagle.alert.config.ZKConfig;
+import org.apache.eagle.alert.config.ZKConfigBuilder;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordinator.Coordinator;
+import org.apache.eagle.alert.coordinator.ScheduleOption;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * @since May 5, 2016
+ *
+ */
+public class CoordinatorTest {
+
+ @SuppressWarnings({ "resource", "unused" })
+ @Ignore
+ @Test
+ public void test() throws Exception {
+ before();
+ Config config = ConfigFactory.load().getConfig("coordinator");
+ ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+ ConfigBusProducer producer = new ConfigBusProducer(zkConfig);
+ IMetadataServiceClient client = new MetadataServiceClientImpl(config);
+
+ Coordinator coordinator = new Coordinator(config, producer, client);
+ ScheduleOption option = new ScheduleOption();
+ ScheduleState state = coordinator.schedule(option);
+ String v = state.getVersion();
+
+ AtomicBoolean validated = new AtomicBoolean(false);
+ ConfigBusConsumer consumer = new ConfigBusConsumer(zkConfig, "topo1/spout", new ConfigChangeCallback() {
+ @Override
+ public void onNewConfig(ConfigValue value) {
+ String vId = value.getValue().toString();
+ Assert.assertEquals(v, vId);
+ validated.set(true);
+ }
+ });
+
+ Thread.sleep(1000);
+ Assert.assertTrue(validated.get());
+ }
+
+ @SuppressWarnings({ "resource", "unused" })
+ @Test
+ public void test_01() throws Exception {
+ before();
+ Config config = ConfigFactory.load().getConfig("coordinator");
+ ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+ ConfigBusProducer producer = new ConfigBusProducer(zkConfig);
+ IMetadataServiceClient client = ScheduleContextBuilderTest.getSampleMetadataService();
+
+ Coordinator coordinator = new Coordinator(config, producer, client);
+ ScheduleOption option = new ScheduleOption();
+ ScheduleState state = coordinator.schedule(option);
+ String v = state.getVersion();
+
+ // TODO : assert version
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean validated = new AtomicBoolean(false);
+ ConfigBusConsumer consumer = new ConfigBusConsumer(zkConfig, "topo1/spout", new ConfigChangeCallback() {
+ @Override
+ public void onNewConfig(ConfigValue value) {
+ String vId = value.getValue().toString();
+ Assert.assertEquals(v, vId);
+ validated.set(true);
+ latch.countDown();
+ }
+ });
+
+ latch.await(3, TimeUnit.SECONDS);
+ Assert.assertTrue(validated.get());
+ }
+
+ @Ignore
+ @Test
+ public void test_main() throws Exception {
+ before();
+
+ Coordinator.main(null);
+ }
+
+ @Before
+ public void before() {
+ System.setProperty("config.resource", "/test-application.conf");
+ ConfigFactory.invalidateCaches();
+ ConfigFactory.load().getConfig("coordinator");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
new file mode 100644
index 0000000..155a9e5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.alert.coordinator;
+
+/**
+ * Since 4/28/16.
+ */
+public class DynamicPolicyLoaderTest {
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
new file mode 100644
index 0000000..e2ea031
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator;
+
+import org.apache.eagle.alert.config.ConfigBusProducer;
+import org.apache.eagle.alert.config.ZKConfig;
+import org.apache.eagle.alert.config.ZKConfigBuilder;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordinator.Coordinator;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * @since May 9, 2016
+ *
+ */
+public class MetadataServiceClientImplTest {
+
+ @Ignore
+ @Test
+ public void addScheduleState() throws Exception {
+ ConfigFactory.invalidateCaches();
+ System.setProperty("config.resource", "/test-application.conf");
+ Config config = ConfigFactory.load("test-application.conf").getConfig("coordinator");
+ MetadataServiceClientImpl client = new MetadataServiceClientImpl(config);
+
+ ScheduleState ss = new ScheduleState();
+ ss.setVersion("spec_version_1463764252582");
+
+ client.addScheduleState(ss);
+
+ client.close();
+
+ ss.setVersion("spec_version_1464764252582");
+ ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+ ConfigBusProducer producer = new ConfigBusProducer(zkConfig);
+ Coordinator.postSchedule(client, ss, producer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
new file mode 100644
index 0000000..f2e67de
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.alert.coordinator.mock.InMemMetadataServiceClient;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn.Type;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * @since May 5, 2016
+ *
+ */
+public class ScheduleContextBuilderTest {
+
+ @Test
+ public void test() {
+ InMemMetadataServiceClient client = getSampleMetadataService();
+
+ ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+
+ IScheduleContext context = builder.buildContext();
+
+ // assert topology usage
+ Map<String, TopologyUsage> usages = context.getTopologyUsages();
+ Assert.assertEquals(1, usages.get(TOPO1).getMonitoredStream().size());
+ Assert.assertTrue(usages.get(TOPO1).getPolicies().contains(TEST_POLICY_1));
+
+ String alertBolt0 = TOPO1 + "-alert-" + "0";
+ String alertBolt1 = TOPO1 + "-alert-" + "1";
+ String alertBolt2 = TOPO1 + "-alert-" + "2";
+ for (AlertBoltUsage u : usages.get(TOPO1).getAlertUsages().values()) {
+ if (u.getBoltId().equals(alertBolt0) || u.getBoltId().equals(alertBolt1)
+ || u.getBoltId().equals(alertBolt2)) {
+ Assert.assertEquals(1, u.getPolicies().size());
+ Assert.assertTrue(u.getPolicies().contains(TEST_POLICY_1));
+ Assert.assertEquals(1, u.getPartitions().size());
+ Assert.assertEquals(1, u.getReferQueues().size());
+ }
+ }
+ }
+
+ @Test
+ public void test_remove_policy() {
+ InMemMetadataServiceClient client = getSampleMetadataService();
+ ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+
+ PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
+
+ IScheduleContext context = builder.buildContext();
+ Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+ StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight();
+
+ client.listPolicies().remove(0);
+ context = builder.buildContext();
+ Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+
+ WorkSlot slot = queue.getWorkingSlots().get(0);
+ Set<String> topoPolicies = context.getTopologyUsages().get(slot.topologyName).getPolicies();
+ Assert.assertFalse(topoPolicies.contains(TEST_DATASOURCE_1));
+ Assert.assertEquals(0, topoPolicies.size());
+ }
+
+ @Test
+ public void test_changed_policy() {
+ InMemMetadataServiceClient client = getSampleMetadataService();
+ ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+ PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
+
+ IScheduleContext context = builder.buildContext();
+ Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+
+ StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight();
+
+ PolicyDefinition pd1 = client.listPolicies().get(0);
+ // add a new group by column : need to replace the partiton spec, to
+ // avoid reference same object in
+ // on jvm (no serialization and deserialization)
+ StreamPartition par = new StreamPartition(pd1.getPartitionSpec().get(0));
+ par.getColumns().add("s1");
+ pd1.getPartitionSpec().clear();
+ pd1.getPartitionSpec().add(par);
+
+ context = builder.buildContext();
+
+ // assert the policy assignment is removed
+ Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+ // assert the monitored stream is removed as no policy on it now.
+ Assert.assertEquals(0, context.getMonitoredStreams().size());
+ // assert the topology usage doesn't contain policy
+ WorkSlot slot = queue.getWorkingSlots().get(0);
+ TopologyUsage topologyUsage = context.getTopologyUsages().get(slot.topologyName);
+ Set<String> topoPolicies = topologyUsage.getPolicies();
+ Assert.assertFalse(topoPolicies.contains(TEST_DATASOURCE_1));
+ Assert.assertEquals(0, topoPolicies.size());
+ // assert the topology usage doesn't contain the monitored stream
+ Assert.assertEquals(0, topologyUsage.getMonitoredStream().size());
+ // assert the alert bolt usage doesn't have the queue reference
+ Assert.assertEquals(0, topologyUsage.getAlertBoltUsage(slot.getBoltId()).getReferQueues().size());
+ }
+
+ @Test
+ public void test_renamed_topologies() {
+ InMemMetadataServiceClient client = getSampleMetadataService();
+ ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+
+ IScheduleContext context = builder.buildContext();
+ Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+
+ Topology t = client.listTopologies().get(0);
+ t.setName("newName");
+
+ context = builder.buildContext();
+ Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+ }
+
+ private static final String TOPO1 = "topo1";
+ private static final String V1 = "v1";
+ private static final String COL1 = "col1";
+ private static final String OUT_STREAM1 = "out-stream1";
+ private static final String TEST_POLICY_1 = "test-policy-1";
+ private static final String TEST_STREAM_DEF_1 = "testStreamDef";
+ private static final String TEST_DATASOURCE_1 = "test-datasource-1";
+ private static StreamPartition par;
+ private static String queueId;
+ private static StreamGroup streamGroup;
+
+ public static InMemMetadataServiceClient getSampleMetadataService() {
+ InMemMetadataServiceClient client = new InMemMetadataServiceClient();
+ client.listTopologies().add(createSampleTopology());
+ client.listDataSources().add(createKafka2TupleMetadata());
+ // client.listSpoutMetadata().add(createS)
+ client.listPolicies().add(createPolicy());
+ client.listPublishment().add(createPublishment());
+ client.listStreams().add(createStreamDefinition());
+ client.addScheduleState(createScheduleState());
+ return client;
+ }
+
+ private static ScheduleState createScheduleState() {
+ ScheduleState ss = new ScheduleState();
+ ss.setVersion(V1);
+
+ ss.getMonitoredStreams().add(createMonitoredStream());
+ ss.getAssignments().add(createAssignment());
+
+ return ss;
+ }
+
+ private static MonitoredStream createMonitoredStream() {
+ MonitoredStream ms = new MonitoredStream(streamGroup);
+ ms.setVersion(V1);
+
+ List<WorkSlot> slots = new ArrayList<WorkSlot>();
+ WorkSlot slot0 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 0);
+ WorkSlot slot1 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 1);
+ WorkSlot slot2 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 2);
+ slots.add(slot0);
+ slots.add(slot1);
+ slots.add(slot2);
+
+ StreamWorkSlotQueue q = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), slots);
+ ms.addQueues(q);
+ queueId = q.getQueueId();
+ return ms;
+ }
+
+ private static PolicyAssignment createAssignment() {
+ PolicyAssignment pa = new PolicyAssignment(TEST_POLICY_1, queueId);
+ return pa;
+ }
+
+ private static PolicyDefinition createPolicy() {
+ PolicyDefinition def = new PolicyDefinition();
+ def.setName(TEST_POLICY_1);
+ def.setInputStreams(Arrays.asList(TEST_STREAM_DEF_1));
+ def.setOutputStreams(Arrays.asList(OUT_STREAM1));
+ def.setParallelismHint(5);
+
+ streamGroup = new StreamGroup();
+ par = new StreamPartition();
+ par.setStreamId(TEST_STREAM_DEF_1);
+ par.getColumns().add(COL1);
+ StreamSortSpec sortSpec = new StreamSortSpec();
+// sortSpec.setColumn("col1");
+ sortSpec.setWindowMargin(3);
+ sortSpec.setWindowPeriod("PT1M");
+
+ par.setSortSpec(sortSpec);
+ streamGroup.addStreamPartition(par);
+
+ List<StreamPartition> lists = new ArrayList<StreamPartition>();
+ lists.add(par);
+ def.setPartitionSpec(lists);
+ return def;
+ }
+
+ private static StreamDefinition createStreamDefinition() {
+ StreamDefinition def = new StreamDefinition();
+ def.setStreamId(TEST_STREAM_DEF_1);
+ def.setDataSource(TEST_DATASOURCE_1);
+
+ StreamColumn col = new StreamColumn();
+ col.setName(COL1);
+ col.setRequired(true);
+ col.setType(Type.STRING);
+ def.getColumns().add(col);
+
+ return def;
+ }
+
+ private static Publishment createPublishment() {
+ Publishment pub = new Publishment();
+ pub.setType("KAFKA");
+ pub.setName("test-stream-output");
+ pub.setPolicyIds(Arrays.asList(TEST_POLICY_1));
+ return pub;
+ }
+
+ private static Kafka2TupleMetadata createKafka2TupleMetadata() {
+ Kafka2TupleMetadata ktm = new Kafka2TupleMetadata();
+ ktm.setName(TEST_DATASOURCE_1);
+ ktm.setSchemeCls("SchemeClass");
+ ktm.setTopic("tupleTopic");
+ ktm.setType("KAFKA");
+ ktm.setCodec(new Tuple2StreamMetadata());
+ return ktm;
+ }
+
+ private static Topology createSampleTopology() {
+ Topology t = new Topology(TOPO1, 3, 10);
+ for (int i = 0; i < t.getNumOfGroupBolt(); i++) {
+ t.getGroupNodeIds().add(t.getName() + "-grp-" + i);
+ }
+ for (int i = 0; i < t.getNumOfAlertBolt(); i++) {
+ t.getAlertBoltIds().add(t.getName() + "-alert-" + i);
+ }
+ return t;
+ }
+
+}