You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/11/08 13:17:02 UTC
incubator-eagle git commit: [EAGLE-751] update policyDefinition#equal
Repository: incubator-eagle
Updated Branches:
refs/heads/master f21145635 -> 02cbdb68d
[EAGLE-751] update policyDefinition#equal
https://issues.apache.org/jira/browse/EAGLE-751
Author: Zhao, Qingwen <qi...@apache.org>
Closes #622 from qingwen220/EAGEL-751.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/02cbdb68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/02cbdb68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/02cbdb68
Branch: refs/heads/master
Commit: 02cbdb68d677e1d286149caad8ca7efcb64489ee
Parents: f211456
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Tue Nov 8 21:16:53 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Tue Nov 8 21:16:53 2016 +0800
----------------------------------------------------------------------
...e.alert.app.AlertUnitTopologyAppProvider.xml | 8 ++--
.../engine/coordinator/PolicyDefinition.java | 16 ++++---
.../eagle/alert/model/TestPolicyDefinition.java | 45 ++++++++++++++++++++
.../engine/spout/KafkaMessageIdWrapper.java | 10 ++++-
.../spout/SpoutOutputCollectorWrapper.java | 1 +
.../org/apache/eagle/common/DateTimeUtil.java | 2 +-
6 files changed, 68 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/02cbdb68/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
index 06ccaea..3c8d58e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
@@ -66,10 +66,10 @@
<required>false</required>
</property>
<property>
- <name>topology.messageTimeoutSecs</name>
- <displayName>Message Timeout Seconds</displayName>
- <value>3600</value>
- <description>Number of tuple timeout in seconds</description>
+ <name>topology.message.timeout.secs</name>
+ <displayName>topology message timeout (secs)</displayName>
+ <description>default timeout is 30s</description>
+ <value>30</value>
<required>false</required>
</property>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/02cbdb68/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index cfd7fef..c131e12 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -125,12 +125,13 @@ public class PolicyDefinition implements Serializable {
@Override
public int hashCode() {
return new HashCodeBuilder()
- .append(name)
- .append(inputStreams)
- .append(outputStreams)
- .append(definition)
- .append(partitionSpec)
- // .append(parallelismHint)
+ .append(name)
+ .append(inputStreams)
+ .append(outputStreams)
+ .append(definition)
+ .append(partitionSpec)
+ .append(policyStatus)
+ .append(parallelismHint)
.build();
}
@@ -153,7 +154,8 @@ public class PolicyDefinition implements Serializable {
&& (another.definition != null && another.definition.equals(this.definition))
&& Objects.equals(this.definition, another.definition)
&& CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
- && another.parallelismHint == this.parallelismHint
+ && another.policyStatus.equals(this.policyStatus)
+ && another.parallelismHint == this.parallelismHint
) {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/02cbdb68/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java
new file mode 100644
index 0000000..1cc063d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.model;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestPolicyDefinition {
+
+ @Test
+ public void testEqual() {
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ PolicyDefinition policy1 = new PolicyDefinition();
+ policy1.setName("policy1");
+ policy1.setDefinition(definition);
+
+ PolicyDefinition policy2 = new PolicyDefinition();
+ policy2.setName("policy1");
+ policy2.setPolicyStatus(PolicyDefinition.PolicyStatus.DISABLED);
+ policy2.setDefinition(definition);
+
+ PolicyDefinition policy3 = new PolicyDefinition();
+ policy3.setName("policy1");
+ policy3.setDefinition(definition);
+
+ Assert.assertTrue(policy1.equals(policy3));
+ Assert.assertFalse(policy1.equals(policy2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/02cbdb68/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
index ccd3493..74dea03 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
@@ -20,23 +20,29 @@ package org.apache.eagle.alert.engine.spout;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.eagle.common.DateTimeUtil;
/**
* Created on 2/18/16.
*/
public class KafkaMessageIdWrapper {
public Object id;
+ public String topic;
+ public long timestamp;
public KafkaMessageIdWrapper(Object o) {
this.id = o;
}
- public String topic;
private static final ObjectMapper objectMapper = new ObjectMapper();
public String toString() {
try {
- return String.format("KafkaMessageIdWrapper[topic=%s, id=%s]", topic, objectMapper.writeValueAsString(id));
+ return String.format("KafkaMessageIdWrapper[topic=%s, id=%s, timestamp=%s %s]",
+ topic,
+ objectMapper.writeValueAsString(id),
+ DateTimeUtil.millisecondsToHumanDateWithSeconds(timestamp),
+ DateTimeUtil.CURRENT_TIME_ZONE.getID());
} catch (JsonProcessingException e) {
throw new IllegalStateException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/02cbdb68/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
index e205da4..4545fa8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
@@ -96,6 +96,7 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements
KafkaMessageIdWrapper newMessageId = new KafkaMessageIdWrapper(messageId);
newMessageId.topic = topic;
+ newMessageId.timestamp = System.currentTimeMillis();
/**
phase 1: tuple to stream converter
if this topic multiplexes multiple streams, then retrieve the individual streams.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/02cbdb68/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
index 74b733d..8d6814b 100644
--- a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
@@ -35,7 +35,7 @@ public class DateTimeUtil {
public static final long ONEMINUTE = 1L * 60L * 1000L;
public static final long ONEHOUR = 1L * 60L * 60L * 1000L;
public static final long ONEDAY = 24L * 60L * 60L * 1000L;
- private static TimeZone CURRENT_TIME_ZONE;
+ public static TimeZone CURRENT_TIME_ZONE;
static {
Config config = ConfigFactory.load();