You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/11/08 13:17:02 UTC

incubator-eagle git commit: [EAGLE-751] update policyDefinition#equal

Repository: incubator-eagle
Updated Branches:
  refs/heads/master f21145635 -> 02cbdb68d


[EAGLE-751] update policyDefinition#equal

https://issues.apache.org/jira/browse/EAGLE-751

Author: Zhao, Qingwen <qi...@apache.org>

Closes #622 from qingwen220/EAGEL-751.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/02cbdb68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/02cbdb68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/02cbdb68

Branch: refs/heads/master
Commit: 02cbdb68d677e1d286149caad8ca7efcb64489ee
Parents: f211456
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Tue Nov 8 21:16:53 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Tue Nov 8 21:16:53 2016 +0800

----------------------------------------------------------------------
 ...e.alert.app.AlertUnitTopologyAppProvider.xml |  8 ++--
 .../engine/coordinator/PolicyDefinition.java    | 16 ++++---
 .../eagle/alert/model/TestPolicyDefinition.java | 45 ++++++++++++++++++++
 .../engine/spout/KafkaMessageIdWrapper.java     | 10 ++++-
 .../spout/SpoutOutputCollectorWrapper.java      |  1 +
 .../org/apache/eagle/common/DateTimeUtil.java   |  2 +-
 6 files changed, 68 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/02cbdb68/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
index 06ccaea..3c8d58e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
@@ -66,10 +66,10 @@
             <required>false</required>
         </property>
         <property>
-            <name>topology.messageTimeoutSecs</name>
-            <displayName>Message Timeout Seconds</displayName>
-            <value>3600</value>
-            <description>Number of tuple timeout in seconds</description>
+            <name>topology.message.timeout.secs</name>
+            <displayName>topology message timeout (secs)</displayName>
+            <description>default timeout is 30s</description>
+            <value>30</value>
             <required>false</required>
         </property>
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/02cbdb68/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index cfd7fef..c131e12 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -125,12 +125,13 @@ public class PolicyDefinition implements Serializable {
     @Override
     public int hashCode() {
         return new HashCodeBuilder()
-            .append(name)
-            .append(inputStreams)
-            .append(outputStreams)
-            .append(definition)
-            .append(partitionSpec)
-            // .append(parallelismHint)
+                .append(name)
+                .append(inputStreams)
+                .append(outputStreams)
+                .append(definition)
+                .append(partitionSpec)
+                .append(policyStatus)
+                .append(parallelismHint)
             .build();
     }
 
@@ -153,7 +154,8 @@ public class PolicyDefinition implements Serializable {
             && (another.definition != null && another.definition.equals(this.definition))
             && Objects.equals(this.definition, another.definition)
             && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
-             && another.parallelismHint == this.parallelismHint
+                && another.policyStatus.equals(this.policyStatus)
+                && another.parallelismHint == this.parallelismHint
             ) {
             return true;
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/02cbdb68/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java
new file mode 100644
index 0000000..1cc063d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.model;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestPolicyDefinition {
+
+    @Test
+    public void testEqual() {
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        PolicyDefinition policy1 = new PolicyDefinition();
+        policy1.setName("policy1");
+        policy1.setDefinition(definition);
+
+        PolicyDefinition policy2 = new PolicyDefinition();
+        policy2.setName("policy1");
+        policy2.setPolicyStatus(PolicyDefinition.PolicyStatus.DISABLED);
+        policy2.setDefinition(definition);
+
+        PolicyDefinition policy3 = new PolicyDefinition();
+        policy3.setName("policy1");
+        policy3.setDefinition(definition);
+
+        Assert.assertTrue(policy1.equals(policy3));
+        Assert.assertFalse(policy1.equals(policy2));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/02cbdb68/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
index ccd3493..74dea03 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
@@ -20,23 +20,29 @@ package org.apache.eagle.alert.engine.spout;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.eagle.common.DateTimeUtil;
 
 /**
  * Created on 2/18/16.
  */
 public class KafkaMessageIdWrapper {
     public Object id;
+    public String topic;
+    public long timestamp;
 
     public KafkaMessageIdWrapper(Object o) {
         this.id = o;
     }
 
-    public String topic;
     private static final ObjectMapper objectMapper = new ObjectMapper();
 
     public String toString() {
         try {
-            return String.format("KafkaMessageIdWrapper[topic=%s, id=%s]", topic, objectMapper.writeValueAsString(id));
+            return String.format("KafkaMessageIdWrapper[topic=%s, id=%s, timestamp=%s %s]",
+                    topic,
+                    objectMapper.writeValueAsString(id),
+                    DateTimeUtil.millisecondsToHumanDateWithSeconds(timestamp),
+                    DateTimeUtil.CURRENT_TIME_ZONE.getID());
         } catch (JsonProcessingException e) {
             throw new IllegalStateException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/02cbdb68/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
index e205da4..4545fa8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
@@ -96,6 +96,7 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
 
         KafkaMessageIdWrapper newMessageId = new KafkaMessageIdWrapper(messageId);
         newMessageId.topic = topic;
+        newMessageId.timestamp = System.currentTimeMillis();
         /**
          phase 1: tuple to stream converter
          if this topic multiplexes multiple streams, then retrieve the individual streams.

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/02cbdb68/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
index 74b733d..8d6814b 100644
--- a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
@@ -35,7 +35,7 @@ public class DateTimeUtil {
     public static final long ONEMINUTE = 1L * 60L * 1000L;
     public static final long ONEHOUR = 1L * 60L * 60L * 1000L;
     public static final long ONEDAY = 24L * 60L * 60L * 1000L;
-    private static TimeZone CURRENT_TIME_ZONE;
+    public static TimeZone CURRENT_TIME_ZONE;
 
     static {
         Config config = ConfigFactory.load();