You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/01/12 08:47:56 UTC
[3/8] incubator-eagle git commit: EAGLE-79 Provide aggregation and
persistence DSL support
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DynamicPolicyLoader.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DynamicPolicyLoader.java
new file mode 100644
index 0000000..7cb8e1f
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DynamicPolicyLoader.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import com.netflix.config.*;
+import com.sun.jersey.client.impl.CopyOnWriteHashMap;
+import com.typesafe.config.Config;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *
+ * @param <T>
+ */
+public class DynamicPolicyLoader<T extends AbstractPolicyDefinitionEntity> {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class);
+
+ private final int defaultInitialDelayMillis = 30*1000;
+ private final int defaultDelayMillis = 60*1000;
+ private final boolean defaultIgnoreDeleteFromSource = true;
+ /**
+ * one alertExecutor may have multiple instances, that is why there is a list of PolicyLifecycleMethods
+ */
+ private volatile CopyOnWriteHashMap<String, List<PolicyLifecycleMethods<T>>> policyChangeListeners = new CopyOnWriteHashMap<>();
+ private volatile CopyOnWriteHashMap<String, List<PolicyDistributionReportMethods>> policyDistributionUpdaters = new CopyOnWriteHashMap<>();
+ private static DynamicPolicyLoader instance = new DynamicPolicyLoader();
+ private volatile boolean initialized = false;
+
+ public void addPolicyChangeListener(String alertExecutorId, PolicyLifecycleMethods<T> alertExecutor){
+ synchronized(policyChangeListeners) {
+ if (policyChangeListeners.get(alertExecutorId) == null) {
+ policyChangeListeners.put(alertExecutorId, new ArrayList<PolicyLifecycleMethods<T>>());
+ }
+ policyChangeListeners.get(alertExecutorId).add(alertExecutor);
+ }
+ }
+
+ private static ConcurrentHashMap<Class, DynamicPolicyLoader> maps = new ConcurrentHashMap<Class, DynamicPolicyLoader>();
+ public void addPolicyDistributionReporter(String alertExecutorId, PolicyDistributionReportMethods policyDistUpdater){
+ synchronized(policyDistributionUpdaters) {
+ if(policyDistributionUpdaters.get(alertExecutorId) == null) {
+ policyDistributionUpdaters.put(alertExecutorId, new ArrayList<PolicyDistributionReportMethods>());
+ }
+ policyDistributionUpdaters.get(alertExecutorId).add(policyDistUpdater);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <K extends AbstractPolicyDefinitionEntity> DynamicPolicyLoader<K> getInstanceOf(Class<K> clz) {
+ if (maps.containsKey(clz)) {
+ return maps.get(clz);
+ } else {
+ DynamicPolicyLoader<K> loader = new DynamicPolicyLoader<K>();
+ maps.putIfAbsent(clz, loader);
+ return maps.get(clz);
+ }
+ }
+
+ /**
+ * singleton with init would be good for unit test as well, and it ensures that
+ * initialization happens only once before you use it.
+ * @param config
+ * @param dao
+ */
+ public void init(Map<String, Map<String, T>> initialAlertDefs,
+ PolicyDefinitionDAO<T> dao, Config config){
+ if(!initialized){
+ synchronized(this){
+ if(!initialized){
+ internalInit(initialAlertDefs, dao, config);
+ initialized = true;
+ }
+ }
+ }
+ }
+
+ /**
+ * map from alertExecutorId+partitionId to AlertExecutor which implements PolicyLifecycleMethods
+ * @param initialAlertDefs
+ * @param dao
+ * @param config
+ */
+ private void internalInit(Map<String, Map<String, T>> initialAlertDefs,
+ PolicyDefinitionDAO<T> dao, Config config){
+ if(!config.getBoolean("dynamicConfigSource.enabled")) {
+ return;
+ }
+ AbstractPollingScheduler scheduler = new FixedDelayPollingScheduler(
+ config.getInt("dynamicConfigSource.initDelayMillis"),
+ config.getInt("dynamicConfigSource.delayMillis"),
+ false
+ );
+
+ scheduler.addPollListener(new PollListener(){
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handleEvent(EventType eventType, PollResult lastResult,
+ Throwable exception) {
+ if (lastResult == null) {
+ LOG.error("The lastResult is null, something must be wrong, probably the eagle service is dead!");
+ throw new RuntimeException("The lastResult is null, probably the eagle service is dead! ", exception);
+ }
+ Map<String, Object> added = lastResult.getAdded();
+ Map<String, Object> changed = lastResult.getChanged();
+ Map<String, Object> deleted = lastResult.getDeleted();
+ for(Map.Entry<String, List<PolicyLifecycleMethods<T>>> entry : policyChangeListeners.entrySet()){
+ String alertExecutorId = entry.getKey();
+ for (PolicyLifecycleMethods<T> policyLifecycleMethod : entry.getValue()) {
+ Map<String, T> addedPolicies = (Map<String, T>)added.get(trimPartitionNum(alertExecutorId));
+ if(addedPolicies != null && addedPolicies.size() > 0){
+ policyLifecycleMethod.onPolicyCreated(addedPolicies);
+ }
+ Map<String, T> changedPolicies = (Map<String, T>)changed.get(trimPartitionNum(alertExecutorId));
+ if(changedPolicies != null && changedPolicies.size() > 0){
+ policyLifecycleMethod.onPolicyChanged(changedPolicies);
+ }
+ Map<String, T> deletedPolicies = (Map<String, T>)deleted.get(trimPartitionNum(alertExecutorId));
+ if(deletedPolicies != null && deletedPolicies.size() > 0){
+ policyLifecycleMethod.onPolicyDeleted(deletedPolicies);
+ }
+ }
+ }
+
+ // notify policyDistributionUpdaters
+ for(Map.Entry<String, List<PolicyDistributionReportMethods>> entry : policyDistributionUpdaters.entrySet()){
+ for(PolicyDistributionReportMethods policyDistributionUpdateMethod : entry.getValue()){
+ policyDistributionUpdateMethod.report();
+ }
+ }
+ }
+ private String trimPartitionNum(String alertExecutorId){
+ int i = alertExecutorId.lastIndexOf('_');
+ if(i != -1){
+ return alertExecutorId.substring(0, i);
+ }
+ return alertExecutorId;
+ }
+ });
+
+ ConcurrentCompositeConfiguration finalConfig = new ConcurrentCompositeConfiguration();
+
+ PolledConfigurationSource source = new DynamicPolicySource<T>(initialAlertDefs, dao, config);
+
+ try{
+ DynamicConfiguration dbSourcedConfiguration = new DynamicConfiguration(source, scheduler);
+ finalConfig.addConfiguration(dbSourcedConfiguration);
+ }catch(Exception ex){
+ LOG.warn("Fail loading from DB, continue without DB sourced configuration", ex);
+ }
+ }
+
+ public static class DynamicPolicySource<M extends AbstractPolicyDefinitionEntity> implements PolledConfigurationSource {
+ private static Logger LOG = LoggerFactory.getLogger(DynamicPolicySource.class);
+ private Config config;
+ private PolicyDefinitionDAO<M> dao;
+ /**
+ * mapping from alertExecutorId to list of policies
+ */
+ private Map<String, Map<String, M>> cachedAlertDefs;
+
+ public DynamicPolicySource(Map<String, Map<String, M>> initialAlertDefs, PolicyDefinitionDAO<M> dao, Config config){
+ this.cachedAlertDefs = initialAlertDefs;
+ this.dao = dao;
+ this.config = config;
+ }
+
+ public PollResult poll(boolean initial, Object checkPoint) throws Exception {
+ LOG.info("Poll policy from eagle service " + config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST) +
+ ":" + config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT) );
+ Map<String, Map<String, M>> newAlertDefs =
+ dao.findActivePoliciesGroupbyExecutorId(config.getString("eagleProps.site"),
+ config.getString("eagleProps.dataSource"));
+
+ // compare runtime alertDefs with cachedAlertDefs and figure out what are added/deleted/updated
+ Map<String, Object> added = new HashMap<String, Object>();
+ Map<String, Object> changed = new HashMap<String, Object>();
+ Map<String, Object> deleted = new HashMap<String, Object>();
+
+ Set<String> newAlertExecutorIds = newAlertDefs.keySet();
+ Set<String> cachedAlertExecutorIds = cachedAlertDefs.keySet();
+
+ // dynamically adding new alert executor is not supported, because alert executor is pre-built while program starts up
+ Collection<String> addedAlertExecutorIds = CollectionUtils.subtract(newAlertExecutorIds, cachedAlertExecutorIds);
+ if(addedAlertExecutorIds != null && addedAlertExecutorIds.size() > 0){
+ LOG.warn("New alertExecutorIds are found : " + addedAlertExecutorIds);
+ }
+
+ // if one alert executor is missing, it means all policy under that alert executor should be removed
+ Collection<String> deletedAlertExecutorIds = CollectionUtils.subtract(cachedAlertExecutorIds, newAlertExecutorIds);
+ if(deletedAlertExecutorIds != null && deletedAlertExecutorIds.size() > 0){
+ LOG.warn("Some alertExecutorIds are deleted : " + deletedAlertExecutorIds);
+ for(String deletedAlertExecutorId : deletedAlertExecutorIds){
+ deleted.put(deletedAlertExecutorId, cachedAlertDefs.get(deletedAlertExecutorId));
+ }
+ }
+
+ // we need calculate added/updated/deleted policy for all executors which are not deleted
+// Collection<String> updatedAlertExecutorIds = CollectionUtils.intersection(newAlertExecutorIds, cachedAlertExecutorIds);
+ Collection<String> updatedAlertExecutorIds = newAlertExecutorIds;
+ for(String updatedAlertExecutorId : updatedAlertExecutorIds){
+ Map<String, M> newPolicies = newAlertDefs.get(updatedAlertExecutorId);
+ Map<String, M> cachedPolicies = cachedAlertDefs.get(updatedAlertExecutorId);
+ PolicyComparator.compare(updatedAlertExecutorId, newPolicies, cachedPolicies, added, changed, deleted);
+ }
+
+ cachedAlertDefs = newAlertDefs;
+
+ return PollResult.createIncremental(added, changed, deleted, new Date().getTime());
+ }
+ }
+
+ public static class PolicyComparator {
+
+ public static <M extends AbstractPolicyDefinitionEntity> void compare(String alertExecutorId, Map<String, M> newPolicies, Map<String, M> cachedPolicies,
+ Map<String, Object> added, Map<String, Object> changed, Map<String, Object> deleted){
+ Set<String> newPolicyIds = newPolicies.keySet();
+ Set<String> cachedPolicyIds = cachedPolicies != null ? cachedPolicies.keySet() : new HashSet<String>();
+ Collection<String> addedPolicyIds = CollectionUtils.subtract(newPolicyIds, cachedPolicyIds);
+ Collection<String> deletedPolicyIds = CollectionUtils.subtract(cachedPolicyIds, newPolicyIds);
+ Collection<String> changedPolicyIds = CollectionUtils.intersection(cachedPolicyIds, newPolicyIds);
+ if(addedPolicyIds != null && addedPolicyIds.size() > 0){
+ Map<String, M> tmp = new HashMap<String, M>();
+ for(String addedPolicyId : addedPolicyIds){
+ tmp.put(addedPolicyId, newPolicies.get(addedPolicyId));
+ }
+ added.put(alertExecutorId, tmp);
+ }
+ if(deletedPolicyIds != null && deletedPolicyIds.size() > 0){
+ Map<String, M> tmp = new HashMap<String, M>();
+ for(String deletedPolicyId : deletedPolicyIds){
+ tmp.put(deletedPolicyId, cachedPolicies.get(deletedPolicyId));
+ }
+ deleted.put(alertExecutorId, tmp);
+ }
+ if(changedPolicyIds != null && changedPolicyIds.size() > 0){
+ Map<String, M> tmp = new HashMap<String, M>();
+ for(String changedPolicyId : changedPolicyIds){
+ // check if policy is really changed
+ if(!newPolicies.get(changedPolicyId).equals(cachedPolicies.get(changedPolicyId))){
+ tmp.put(changedPolicyId, newPolicies.get(changedPolicyId));
+ }
+ }
+ changed.put(alertExecutorId, tmp);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PartitionUtils.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PartitionUtils.java
new file mode 100644
index 0000000..bc9a13f
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PartitionUtils.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+
+public class PartitionUtils {
+
+ public static boolean accept(AlertDefinitionAPIEntity alertDef, PolicyPartitioner partitioner, int numPartitions, int partitionSeq){
+ int targetPartitionSeq = partitioner.partition(numPartitions, alertDef.getTags().get(Constants.POLICY_TYPE), alertDef.getTags().get(Constants.POLICY_ID));
+ if(targetPartitionSeq == partitionSeq)
+ return true;
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistStatsDAOLogReporter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistStatsDAOLogReporter.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistStatsDAOLogReporter.java
new file mode 100644
index 0000000..30868f3
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistStatsDAOLogReporter.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.policy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+/**
+ * just append log
+ */
+public class PolicyDistStatsDAOLogReporter implements PolicyDistributionStatsDAO{
+ private static Logger LOG = LoggerFactory.getLogger(PolicyDistStatsDAOLogReporter.class);
+
+ @Override
+ public void reportPolicyMembership(String policyGroupId, Set<String> policyIds) {
+ if(policyIds != null){
+ StringBuilder sb = new StringBuilder();
+ sb.append("policyDistirbutionStats for " + policyGroupId + "[" + "total: " + policyIds.size() + ", ");
+ for(String policyId : policyIds){
+ sb.append(policyId + ",");
+ }
+ sb.append("]");
+ LOG.info(sb.toString());
+ }else{
+ LOG.warn("No policies are assigned to " + policyGroupId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionReportMethods.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionReportMethods.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionReportMethods.java
new file mode 100644
index 0000000..7181857
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionReportMethods.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.policy;
+
+/**
+ * framework will call report method, it is AlertExecutor's responsibility to report policy distribution information
+ */
+public interface PolicyDistributionReportMethods {
+ void report();
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStats.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStats.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStats.java
new file mode 100644
index 0000000..7a70c95
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStats.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.policy;
+
+/**
+ * fields for a policy distribution statistics
+ */
+public class PolicyDistributionStats {
+ private String policyGroupId; // normally groupId is alertExecutorId
+ private String policyId;
+ private boolean markDown; // true if this policy is marked down, false otherwise
+ private double weight; // comprehensive factors for policy overhead
+
+ public String getPolicyId() {
+ return policyId;
+ }
+
+ public void setPolicyId(String policyId) {
+ this.policyId = policyId;
+ }
+
+ public boolean isMarkDown() {
+ return markDown;
+ }
+
+ public void setMarkDown(boolean markDown) {
+ this.markDown = markDown;
+ }
+
+ public double getWeight() {
+ return weight;
+ }
+
+ public void setWeight(double weight) {
+ this.weight = weight;
+ }
+
+ public String getPolicyGroupId() {
+ return policyGroupId;
+ }
+
+ public void setPolicyGroupId(String policyGroupId) {
+ this.policyGroupId = policyGroupId;
+ }
+
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("policyId:");
+ sb.append(policyId);
+ sb.append(", markDown:");
+ sb.append(markDown);
+ sb.append(", weight:");
+ sb.append(weight);
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStatsDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStatsDAO.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStatsDAO.java
new file mode 100644
index 0000000..12b2b83
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStatsDAO.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.policy;
+
+import java.util.Set;
+
+public interface PolicyDistributionStatsDAO {
+ public void reportPolicyMembership(String policyGroupId, Set<String> policyIds);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistroStatsLogReporter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistroStatsLogReporter.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistroStatsLogReporter.java
new file mode 100644
index 0000000..c4033f0
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistroStatsLogReporter.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.policy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+/**
+ * just append log
+ */
+public class PolicyDistroStatsLogReporter implements PolicyDistributionStatsDAO{
+ private static Logger LOG = LoggerFactory.getLogger(PolicyDistroStatsLogReporter.class);
+
+ @Override
+ public void reportPolicyMembership(String policyGroupId, Set<String> policyIds) {
+ if(policyIds != null){
+ StringBuilder sb = new StringBuilder();
+ sb.append("policyDistributionStats for " + policyGroupId +", total: " + policyIds.size() + ", [");
+ for(String policyId : policyIds){
+ sb.append(policyId + ",");
+ }
+ if(policyIds.size() > 0){
+ sb.deleteCharAt(sb.length()-1);
+ }
+ sb.append("]");
+ LOG.info(sb.toString());
+ }else{
+ LOG.warn("No policies are assigned to " + policyGroupId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java
new file mode 100644
index 0000000..7dad895
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.policy.executor.IPolicyExecutor;
+
+public class PolicyEvaluationContext<T extends AbstractPolicyDefinitionEntity, K> {
+
+ public IPolicyExecutor<T, K> alertExecutor;
+
+ public String policyId;
+
+ public PolicyEvaluator<T> evaluator;
+
+ public Collector outputCollector;
+
+ public ResultRender<T, K> resultRender;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java
new file mode 100644
index 0000000..8dce80b
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.dataproc.core.ValuesArray;
+
+/***
+ *
+ * @param <T> - The policy definition entity
+ */
+public interface PolicyEvaluator<T extends AbstractPolicyDefinitionEntity> {
+ /**
+ * take input and evaluate expression
+ * input has 3 fields, first is siddhiAlertContext, second one is streamName, the third is map of attribute name/value
+ * @param input
+ * @throws Exception
+ */
+ public void evaluate(ValuesArray input) throws Exception;
+
+ /**
+ * notify policy evaluator that policy is updated
+ */
+ public void onPolicyUpdate(T newAlertDef);
+
+ /**
+ * notify policy evaluator that policy is deleted, here is cleanup work for this policy evaluator
+ */
+ public void onPolicyDelete();
+
+ /**
+ * get additional context
+ */
+ public Map<String, String> getAdditionalContext();
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluatorServiceProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluatorServiceProvider.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluatorServiceProvider.java
new file mode 100644
index 0000000..f862883
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluatorServiceProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import java.util.List;
+
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+
+import com.fasterxml.jackson.databind.Module;
+
+/**
+ * to provide extensibility, we need a clear differentiation between framework job and provider logic
+ * policy evaluator framework:
+ * - connect to eagle data source
+ * - read all policy definitions
+ * - compare with cached policy definitions
+ * - figure out if policy is created, deleted or updated
+ * - if policy is created, then invoke onPolicyCreated
+ * - if policy is deleted, then invoke onPolicyDeleted
+ * - if policy is updated, then invoke onPolicyUpdated
+ * - for policy report, replace old evaluator engine with new evaluator engine which is created by policy evaluator provider
+ * - specify # of executors for this alert executor id
+ * - dynamically balance # of policies evaluated by each alert executor
+ * - use zookeeper to balance. eaglePolicies/${alertExecutorId}/${alertExecutorInstanceId} => list of policies
+ *
+ * policy evaluator business features:
+ * - register mapping between policy type and PolicyEvaluator
+ * - create evaluator engine runtime when configuration is changed
+ *
+ */
+public interface PolicyEvaluatorServiceProvider<T extends AbstractPolicyDefinitionEntity> {
+ String getPolicyType();
+ Class<? extends PolicyEvaluator<T>> getPolicyEvaluator();
+ List<Module> getBindingModules();
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyLifecycleMethods.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyLifecycleMethods.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyLifecycleMethods.java
new file mode 100644
index 0000000..ad5d5c9
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyLifecycleMethods.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+
+public interface PolicyLifecycleMethods<T extends AbstractPolicyDefinitionEntity> {
+ void onPolicyCreated(Map<String, T> added);
+ void onPolicyChanged(Map<String, T> changed);
+ void onPolicyDeleted(Map<String, T> deleted);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyManager.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyManager.java
new file mode 100644
index 0000000..3f3581d
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyManager.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.Module;
+
+public class PolicyManager {
+ private final static Logger LOG = LoggerFactory.getLogger(PolicyManager.class);
+ private static PolicyManager instance = new PolicyManager();
+
+ private ServiceLoader<PolicyEvaluatorServiceProvider> loader;
+
+ private Map<String, Class<? extends PolicyEvaluator>> policyEvaluators = new HashMap<String, Class<? extends PolicyEvaluator>>();
+ private Map<String, List<Module>> policyModules = new HashMap<String, List<Module>>();
+
+ private PolicyManager(){
+ loader = ServiceLoader.load(PolicyEvaluatorServiceProvider.class);
+ Iterator<PolicyEvaluatorServiceProvider> iter = loader.iterator();
+ while(iter.hasNext()){
+ PolicyEvaluatorServiceProvider factory = iter.next();
+ LOG.info("Supported policy type : " + factory.getPolicyType());
+ policyEvaluators.put(factory.getPolicyType(), factory.getPolicyEvaluator());
+ policyModules.put(factory.getPolicyType(), factory.getBindingModules());
+ }
+ }
+
+ public static PolicyManager getInstance(){
+ return instance;
+ }
+
+ public Class<? extends PolicyEvaluator> getPolicyEvaluator(String policyType){
+ return policyEvaluators.get(policyType);
+ }
+
+ public List<Module> getPolicyModules(String policyType){
+ return policyModules.get(policyType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java
new file mode 100644
index 0000000..fa9620c
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import java.io.Serializable;
+
+/**
+ * partition policies so that policies can be distributed into different alert evaluators
+ */
+public interface PolicyPartitioner extends Serializable {
+ int partition(int numTotalPartitions, String policyType, String policyId);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java
new file mode 100644
index 0000000..cc59880
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+
+import java.util.List;
+
+/**
+ * @since Dec 17, 2015
+ *
+ */
+public interface ResultRender<T extends AbstractPolicyDefinitionEntity, K> {
+
+ K render(Config config, List<Object> rets, PolicyEvaluationContext<T, K> siddhiAlertContext, long timestamp);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
new file mode 100644
index 0000000..0499431
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.common;
+
+import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
+
+public class Constants {
+ public final static String ALERT_SERVICE_ENDPOINT_NAME = "AlertService";
+ public final static String ALERT_DEFINITION_SERVICE_ENDPOINT_NAME = "AlertDefinitionService";
+ public final static String ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME = "AlertStreamSchemaService";
+ public final static String ALERT_DATA_SOURCE_SERVICE_ENDPOINT_NAME = "AlertDataSourceService";
+ public final static String ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME = "AlertExecutorService";
+ public final static String ALERT_STREAM_SERVICE_ENDPOINT_NAME = "AlertStreamService";
+ public static final String ALERT_EMAIL_ORIGIN_PROPERTY = "alertEmailOrigin";
+ public static final String ALERT_TIMESTAMP_PROPERTY = "alertTimestamp";
+
+ public final static String AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME = "AggregateDefinitionService";
+
+ public static final String ALERT_EMAIL_TIME_PROPERTY = "timestamp";
+ public static final String ALERT_EMAIL_COUNT_PROPERTY = "count";
+ public static final String ALERT_EMAIL_ALERTLIST_PROPERTY = "alertList";
+
+ public static final String URL = "url";
+ public static final String ALERT_SOURCE = "alertSource";
+ public static final String ALERT_MESSAGE = "alertMessage";
+ public static final String SUBJECT = "subject";
+ public static final String ALERT_EXECUTOR_ID = PolicyDefinitionEntityDAOImpl.ALERT_EXECUTOR_ID;// "alertExecutorId";
+ public static final String POLICY_NAME = "policyName";
+ public static final String POLICY_ID = "policyId";
+ public static final String SOURCE_STREAMS = "sourceStreams";
+ public static final String ALERT_EVENT = "alertEvent";
+ public static final String POLICY_DETAIL_URL = "policyDetailUrl";
+ public static final String ALERT_DETAIL_URL = "alertDetailUrl";
+
+ public static final String POLICY_DEFINITION = "policyDefinition";
+ public static final String POLICY_TYPE = "policyType";
+ public static final String STREAM_NAME = "streamName";
+ public static final String ATTR_NAME = "attrName";
+
+ public static final String ALERT_EXECUTOR_CONFIGS = "alertExecutorConfigs";
+ public static final String PARALLELISM = "parallelism";
+ public static final String PARTITIONER = "partitioner";
+ public static final String SOURCE = "source";
+ public static final String PARTITIONSEQ = "partitionSeq";
+ // policy definition status
+ public static final String EAGLE_DEFAULT_POLICY_NAME = "eagleQuery";
+
+ public enum policyType {
+ siddhiCEPEngine,
+ MachineLearning;
+
+ policyType() {
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/UrlBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/UrlBuilder.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/UrlBuilder.java
new file mode 100644
index 0000000..b10c6c8
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/UrlBuilder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.policy.common;
+
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.common.EagleBase64Wrapper;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.eagle.log.entity.RowkeyBuilder;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.mortbay.util.UrlEncoded;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UrlBuilder {
+
+ private static final Logger logger = LoggerFactory.getLogger(UrlBuilder.class);
+
+ public static String getEncodedRowkey(AlertAPIEntity entity) throws Exception {
+ InternalLog log = HBaseInternalLogHelper.convertToInternalLog(entity, EntityDefinitionManager.getEntityDefinitionByEntityClass(entity.getClass()));
+ return EagleBase64Wrapper.encodeByteArray2URLSafeString(RowkeyBuilder.buildRowkey(log));
+ }
+
+ public static String buildAlertDetailUrl(String host, int port, AlertAPIEntity entity) {
+ String baseUrl = "http://" + host + ":" + String.valueOf(port) + "/eagle-service/#/dam/alertDetail/";
+ try {
+ return baseUrl + UrlEncoded.encodeString(getEncodedRowkey(entity));
+ }
+ catch (Exception ex) {
+ logger.error("Fail to populate encodedRowkey for alert Entity" + entity.toString());
+ return "N/A";
+ }
+ }
+
+ public static String buiildPolicyDetailUrl(String host, int port, Map<String, String> tags) {
+ String baseUrl = "http://" + host + ":" + String.valueOf(port) + "/eagle-service/#/dam/policyDetail?";
+ String format = "policy=%s&site=%s&executor=%s";
+ String policy = tags.get(Constants.POLICY_ID);
+ String site = tags.get(EagleConfigConstants.SITE);
+ String alertExecutorID = tags.get(Constants.ALERT_EXECUTOR_ID);
+ if (policy != null && site != null && alertExecutorID != null) {
+ return baseUrl + String.format(format, policy, site, alertExecutorID);
+ }
+ return "N/A";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/config/AbstractPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/config/AbstractPolicyDefinition.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/config/AbstractPolicyDefinition.java
new file mode 100644
index 0000000..a0999e6
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/config/AbstractPolicyDefinition.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.config;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+/**
+ * base fields for all policy definition
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible=true)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AbstractPolicyDefinition {
+ private String type;
+ /**
+ * @return type in string
+ */
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * @param type set type value
+ */
+ public void setType(String type) {
+ this.type = type;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAO.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAO.java
new file mode 100644
index 0000000..650defc
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAO.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import org.apache.eagle.alert.entity.AlertDataSourceEntity;
+
+import java.util.List;
+
+public interface AlertDataSourceDAO {
+ List<AlertDataSourceEntity> findAlertDataSourceBySite(String site) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAOImpl.java
new file mode 100644
index 0000000..ca037a3
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAOImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertDataSourceEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.apache.commons.lang.time.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class AlertDataSourceDAOImpl implements AlertDataSourceDAO{
+ private final Logger LOG = LoggerFactory.getLogger(AlertDataSourceDAOImpl.class);
+ private final EagleServiceConnector connector;
+
+ public AlertDataSourceDAOImpl(EagleServiceConnector connector){
+ this.connector = connector;
+ }
+
+ @Override
+ public List<AlertDataSourceEntity> findAlertDataSourceBySite(String site) throws Exception{
+ try {
+ IEagleServiceClient client = new EagleServiceClientImpl(connector);
+ String query = Constants.ALERT_STREAM_SERVICE_ENDPOINT_NAME + "[@site=\"" + site + "\"]{*}";
+ GenericServiceAPIResponseEntity<AlertDataSourceEntity> response = client.search()
+ .startTime(0)
+ .endTime(10 * DateUtils.MILLIS_PER_DAY)
+ .pageSize(Integer.MAX_VALUE)
+ .query(query)
+ .send();
+ client.close();
+ if (response.getException() != null) {
+ throw new Exception("Got an exception when query eagle service: " + response.getException());
+ }
+ return response.getObj();
+ }
+ catch (Exception ex) {
+ LOG.error("Got an exception when query stream metadata service ", ex);
+ throw new IllegalStateException(ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java
new file mode 100644
index 0000000..39c5284
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility methods to load alert definitions for a program
+ */
+public class AlertDefinitionDAOImpl implements PolicyDefinitionDAO<AlertDefinitionAPIEntity> {
+
+ private static final long serialVersionUID = 7717408104714443056L;
+ private static final Logger LOG = LoggerFactory.getLogger(AlertDefinitionDAOImpl.class);
+ private final EagleServiceConnector connector;
+
+ public AlertDefinitionDAOImpl(EagleServiceConnector connector){
+ this.connector = connector;
+ }
+
+ @Override
+ public List<AlertDefinitionAPIEntity> findActivePolicies(String site, String dataSource) throws Exception {
+ try {
+ IEagleServiceClient client = new EagleServiceClientImpl(connector);
+ String query = Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME + "[@site=\"" + site + "\" AND @dataSource=\"" + dataSource + "\"]{*}";
+ GenericServiceAPIResponseEntity<AlertDefinitionAPIEntity> response = client.search()
+ .pageSize(Integer.MAX_VALUE)
+ .query(query)
+ .send();
+ client.close();
+ if (response.getException() != null) {
+ throw new Exception("Got an exception when query eagle service: " + response.getException());
+ }
+ List<AlertDefinitionAPIEntity> list = response.getObj();
+ List<AlertDefinitionAPIEntity> enabledList = new ArrayList<AlertDefinitionAPIEntity>();
+ for (AlertDefinitionAPIEntity entity : list) {
+ if (entity.isEnabled()) enabledList.add(entity);
+ }
+ return enabledList;
+ }
+ catch (Exception ex) {
+ LOG.error("Got an exception when query alert Def service", ex);
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ @Override
+ public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception {
+ List<AlertDefinitionAPIEntity> list = findActivePolicies(site, dataSource);
+ Map<String, Map<String, AlertDefinitionAPIEntity>> map = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
+ for (AlertDefinitionAPIEntity entity : list) {
+ String executorID = entity.getTags().get(Constants.ALERT_EXECUTOR_ID);
+ if (map.get(executorID) == null) {
+ map.put(executorID, new HashMap<String, AlertDefinitionAPIEntity>());
+ }
+ map.get(executorID).put(entity.getTags().get("policyId"), entity);
+ }
+ return map;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAO.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAO.java
new file mode 100644
index 0000000..09c6266
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAO.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import org.apache.eagle.alert.entity.AlertExecutorEntity;
+
+import java.util.List;
+
+public interface AlertExecutorDAO {
+ List<AlertExecutorEntity> findAlertExecutorByDataSource(String dataSource) throws Exception;
+ List<AlertExecutorEntity> findAlertExecutor(String dataSource, String alertExecutorId) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAOImpl.java
new file mode 100644
index 0000000..0b7b9e3
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAOImpl.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import java.util.List;
+
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertExecutorEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AlertExecutorDAOImpl implements AlertExecutorDAO{
+
+ private static final Logger LOG = LoggerFactory.getLogger(AlertExecutorDAOImpl.class);
+
+ private final EagleServiceConnector connector;
+
+ public AlertExecutorDAOImpl(EagleServiceConnector connector){
+ this.connector = connector;
+ }
+
+ @Override
+ public List<AlertExecutorEntity> findAlertExecutorByDataSource(String dataSource) throws Exception{
+ try {
+ IEagleServiceClient client = new EagleServiceClientImpl(connector);
+ String query = Constants.ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME + "[@dataSource=\"" + dataSource + "\"]{*}";
+ GenericServiceAPIResponseEntity<AlertExecutorEntity> response = client.search()
+ .startTime(0)
+ .endTime(10 * DateUtils.MILLIS_PER_DAY)
+ .pageSize(Integer.MAX_VALUE)
+ .query(query)
+ .send();
+ client.close();
+ if (response.getException() != null) {
+ throw new Exception("Got an exception when query eagle service: " + response.getException());
+ }
+ return response.getObj();
+ }
+ catch (Exception ex) {
+ LOG.error("Got an exception when query stream metadata service ", ex);
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ @Override
+ public List<AlertExecutorEntity> findAlertExecutor(String dataSource, String alertExecutorId) throws Exception{
+ try {
+ IEagleServiceClient client = new EagleServiceClientImpl(connector);
+ String query = Constants.ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME + "[@dataSource=\"" + dataSource + "\""
+ + " AND @alertExecutorId=\"" + alertExecutorId + "\""
+ + "]{*}";
+ GenericServiceAPIResponseEntity<AlertExecutorEntity> response = client.search()
+ .startTime(0)
+ .endTime(10 * DateUtils.MILLIS_PER_DAY)
+ .pageSize(Integer.MAX_VALUE)
+ .query(query)
+ .send();
+ client.close();
+ if (response.getException() != null) {
+ throw new Exception("Got an exception when query eagle service: " + response.getException());
+ }
+ return response.getObj();
+ }
+ catch (Exception ex) {
+ LOG.error("Got an exception when query stream metadata service ", ex);
+ throw new IllegalStateException(ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAO.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAO.java
new file mode 100644
index 0000000..fd3dbb9
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAO.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import org.apache.eagle.alert.entity.AlertStreamEntity;
+
+import java.util.List;
+
+public interface AlertStreamDAO {
+ List<AlertStreamEntity> findAlertStreamByDataSource(String dataSource) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAOImpl.java
new file mode 100644
index 0000000..537a62f
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAOImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertStreamEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.apache.commons.lang.time.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class AlertStreamDAOImpl implements AlertStreamDAO{
+ private final Logger LOG = LoggerFactory.getLogger(AlertStreamDAOImpl.class);
+ private final EagleServiceConnector connector;
+
+ public AlertStreamDAOImpl(EagleServiceConnector connector){
+ this.connector = connector;
+ }
+
+ public List<AlertStreamEntity> findAlertStreamByDataSource(String dataSource) throws Exception{
+ try {
+ IEagleServiceClient client = new EagleServiceClientImpl(connector);
+ String query = Constants.ALERT_STREAM_SERVICE_ENDPOINT_NAME + "[@dataSource=\"" + dataSource + "\"]{*}";
+ GenericServiceAPIResponseEntity<AlertStreamEntity> response = client.search()
+ .startTime(0)
+ .endTime(10 * DateUtils.MILLIS_PER_DAY)
+ .pageSize(Integer.MAX_VALUE)
+ .query(query)
+ .send();
+ client.close();
+ if (response.getException() != null) {
+ throw new Exception("Got an exception when query eagle service: " + response.getException());
+ }
+ return response.getObj();
+ }
+ catch (Exception ex) {
+ LOG.error("Got an exception when query stream metadata service ", ex);
+ throw new IllegalStateException(ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAO.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAO.java
new file mode 100644
index 0000000..95a2186
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAO.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import java.util.List;
+
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+
+public interface AlertStreamSchemaDAO {
+ List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(String dataSource) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAOImpl.java
new file mode 100644
index 0000000..af1fb3a
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAOImpl.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import java.util.List;
+
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+public class AlertStreamSchemaDAOImpl implements AlertStreamSchemaDAO {
+ private final Logger LOG = LoggerFactory.getLogger(AlertStreamSchemaDAOImpl.class);
+
+ private final String eagleServiceHost;
+ private final Integer eagleServicePort;
+ private String username;
+ private String password;
+
+ public AlertStreamSchemaDAOImpl(String eagleServiceHost, Integer eagleServicePort) {
+ this(eagleServiceHost, eagleServicePort, null, null);
+ }
+
+ public AlertStreamSchemaDAOImpl(String eagleServiceHost, Integer eagleServicePort, String username, String password) {
+ this.eagleServiceHost = eagleServiceHost;
+ this.eagleServicePort = eagleServicePort;
+ this.username = username;
+ this.password = password;
+ }
+
+ public AlertStreamSchemaDAOImpl(Config config) {
+ this.eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ this.eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ if (config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) &&
+ config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD)) {
+ this.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ this.password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+ }
+ }
+
+ @Override
+ public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(String dataSource) throws Exception {
+ try {
+ IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
+ String query = Constants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME + "[@dataSource=\"" + dataSource + "\"]{*}";
+ GenericServiceAPIResponseEntity<AlertStreamSchemaEntity> response = client.search()
+ .startTime(0)
+ .endTime(10 * DateUtils.MILLIS_PER_DAY)
+ .pageSize(Integer.MAX_VALUE)
+ .query(query)
+ .send();
+ client.close();
+ if (response.getException() != null) {
+ throw new Exception("Got an exception when query eagle service: " + response.getException());
+ }
+ return response.getObj();
+ }
+ catch (Exception ex) {
+ LOG.error("Got an exception when query stream metadata service ", ex);
+ throw new IllegalStateException(ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionDAO.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionDAO.java
new file mode 100644
index 0000000..74c167b
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionDAO.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+
+/**
+ * @param <T> - Policy definition type
+ */
+public interface PolicyDefinitionDAO<T extends AbstractPolicyDefinitionEntity> extends Serializable{
+ /**
+ * find list of active alert definitions for one specific site and dataSource
+ * @return
+ */
+ List<T> findActivePolicies(String site, String dataSource) throws Exception;
+
+ /**
+ * find map from alertExecutorId to map from policy Id to alert definition for one specific site and dataSource
+ * Map from alertExecutorId to map from policyId to policy definition
+ (site,dataSource) => Map[alertExecutorId,Map[policyId,alertDefinition]]
+ * @return
+ */
+ Map<String, Map<String, T>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionEntityDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionEntityDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionEntityDAOImpl.java
new file mode 100644
index 0000000..c1bd24e
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionEntityDAOImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @since Dec 17, 2015
+ *
+ */
+public class PolicyDefinitionEntityDAOImpl<T extends AbstractPolicyDefinitionEntity> implements PolicyDefinitionDAO<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(PolicyDefinitionEntityDAOImpl.class);
+ public static final String ALERT_EXECUTOR_ID = "alertExecutorId";
+ private final EagleServiceConnector connector;
+ private final String servicePointName;
+
+ public PolicyDefinitionEntityDAOImpl(EagleServiceConnector connector, String serviceName){
+ this.connector = connector;
+ this.servicePointName = serviceName;
+ }
+
+ @Override
+ public List<T> findActivePolicies(String site, String dataSource) throws Exception {
+ try {
+ IEagleServiceClient client = new EagleServiceClientImpl(connector);
+ String query = servicePointName + "[@site=\"" + site + "\" AND @dataSource=\"" + dataSource + "\"]{*}";
+ GenericServiceAPIResponseEntity<T> response = client.search()
+ .pageSize(Integer.MAX_VALUE)
+ .query(query)
+ .send();
+ client.close();
+ if (response.getException() != null) {
+ throw new Exception("Got an exception when query eagle service: " + response.getException());
+ }
+ List<T> list = response.getObj();
+ List<T> enabledList = new ArrayList<T>();
+ for (T entity : list) {
+ if (entity.isEnabled()) enabledList.add(entity);
+ }
+ return enabledList;
+ }
+ catch (Exception ex) {
+ LOG.error("Got an exception when query alert Def service", ex);
+ throw new IllegalStateException(ex);
+ }
+ }
+
+
+
+ @Override
+ public Map<String, Map<String, T>> findActivePoliciesGroupbyExecutorId(String site, String dataSource)
+ throws Exception {
+ List<T> list = findActivePolicies(site, dataSource);
+ Map<String, Map<String, T>> map = new HashMap<String, Map<String, T>>();
+ for (T entity : list) {
+ // support both executorId and legacy alertExecutorId
+ String executorID = entity.getTags().containsKey("executorId") ? entity.getTags().get("executorId")
+ : entity.getTags().get(ALERT_EXECUTOR_ID);
+
+ if (map.get(executorID) == null) {
+ map.put(executorID, new HashMap<String, T>());
+ }
+ map.get(executorID).put(entity.getTags().get("policyId"), entity);
+ }
+ return map;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java
new file mode 100644
index 0000000..c9d28a2
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.executor;
+
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.policy.siddhi.SiddhiEvaluationHandler;
+
+/**
+ * Created on 1/10/16.
+ */
+public interface IPolicyExecutor<T extends AbstractPolicyDefinitionEntity, K> extends SiddhiEvaluationHandler<T, K> {
+ String getExecutorId();
+
+ int getPartitionSeq();
+}