You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/11/09 13:56:06 UTC
[2/2] incubator-eagle git commit: [EAGLE-749] Add unit test for model.
[EAGLE-749] Add unit test for model.
- Add unit test for model which under alert-common moudle.
- Fix equals() hashcode() inconsistent for PartitionedEvent,StreamEvent,Publishment.
- Fix bug for StreamColumn BooleanType.
https://issues.apache.org/jira/browse/EAGLE-749
Author: r7raul1984 <ta...@yhd.com>
Closes #632 from r7raul1984/EAGLE-749.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/2b61cef5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/2b61cef5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/2b61cef5
Branch: refs/heads/master
Commit: 2b61cef585b02fb6f40d5c3de7f7c5c060926ecd
Parents: 1da8dc4
Author: r7raul1984 <ta...@yhd.com>
Authored: Wed Nov 9 21:55:51 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Nov 9 21:55:51 2016 +0800
----------------------------------------------------------------------
.../alert/engine/coordinator/Publishment.java | 21 +-
.../alert/engine/coordinator/StreamColumn.java | 2 +-
.../engine/coordinator/StreamSortSpec.java | 17 +-
.../alert/engine/model/PartitionedEvent.java | 16 +-
.../eagle/alert/engine/model/StreamEvent.java | 11 +-
.../eagle/alert/config/TestConfigBus.java | 5 +-
.../model/Kafka2TupleMetadataTest.java | 49 +++++
.../model/PolicyWorkerQueueTest.java | 63 ++++++
.../model/StreamRepartitionStrategyTest.java | 74 +++++++
.../model/StreamRouterSpecTest.java | 59 ++++++
.../model/Tuple2StreamMetadataTest.java | 50 +++++
.../alert/coordination/model/WorkSlotTest.java | 43 +++++
.../model/internal/MonitoredStreamTest.java | 69 +++++++
.../model/internal/PolicyAssignmentTest.java | 37 ++++
.../model/internal/StreamGroupTest.java | 67 +++++++
.../model/internal/StreamWorkSlotQueueTest.java | 61 ++++++
.../model/internal/TopologyTest.java | 47 +++++
.../OverrideDeduplicatorSpecTest.java | 61 ++++++
.../coordinator/PolicyDefinitionTest.java | 140 ++++++++++++++
.../engine/coordinator/PublishmentTest.java | 100 ++++++++++
.../engine/coordinator/PublishmentTypeTest.java | 47 +++++
.../engine/coordinator/StreamColumnTest.java | 153 +++++++++++++++
.../coordinator/StreamDefinitionTest.java | 52 +++++
.../engine/coordinator/StreamPartitionTest.java | 43 +++++
.../engine/coordinator/StreamSortSpecTest.java | 45 +++++
.../coordinator/StreamingClusterTest.java | 47 +++++
.../engine/model/AlertPublishEventTest.java | 110 +++++++++++
.../engine/model/AlertStreamEventTest.java | 63 ++++++
.../engine/model/PartitionedEventTest.java | 54 ++++++
.../engine/model/StreamEventBuilderTest.java | 166 ++++++++++++++++
.../alert/engine/model/StreamEventTest.java | 191 +++++++++++++++++++
.../eagle/alert/model/StreamEventTest.java | 68 -------
.../eagle/alert/model/TestPolicyDefinition.java | 45 -----
33 files changed, 1930 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
index dbb1844..d1cc33a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -135,28 +135,29 @@ public class Publishment {
if (obj instanceof Publishment) {
Publishment p = (Publishment) obj;
return (Objects.equals(name, p.getName()) && Objects.equals(type, p.getType())
- && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin())
- && Objects.equals(dedupFields, p.getDedupFields())
- && Objects.equals(dedupStateField, p.getDedupStateField())
- && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator())
- && Objects.equals(policyIds, p.getPolicyIds())
- && Objects.equals(streamIds, p.getStreamIds())
- && properties.equals(p.getProperties()));
+ && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin())
+ && Objects.equals(dedupFields, p.getDedupFields())
+ && Objects.equals(dedupStateField, p.getDedupStateField())
+ && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator())
+ && Objects.equals(policyIds, p.getPolicyIds())
+ && Objects.equals(streamIds, p.getStreamIds())
+ && properties.equals(p.getProperties()));
}
return false;
}
@Override
public int hashCode() {
- return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(policyIds)
- .append(properties).build();
+ return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(dedupFields)
+ .append(dedupStateField).append(overrideDeduplicator).append(policyIds).append(streamIds)
+ .append(properties).build();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Publishment[name:").append(name).append(",type:").append(type).append(",policyId:")
- .append(policyIds).append(",properties:").append(properties);
+ .append(policyIds).append(",properties:").append(properties);
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
index 5a5f2cc..4628043 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
@@ -85,7 +85,7 @@ public class StreamColumn implements Serializable {
this.setDefaultValue(Double.valueOf((String) this.getDefaultValue()));
break;
case BOOL:
- this.setDefaultValue(Double.valueOf((String) this.getDefaultValue()));
+ this.setDefaultValue(Boolean.valueOf((String) this.getDefaultValue()));
break;
case OBJECT:
try {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
index 65b9151..ff05fc8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
@@ -16,6 +16,7 @@
*/
package org.apache.eagle.alert.engine.coordinator;
+import org.apache.commons.lang.StringUtils;
import org.apache.eagle.alert.utils.TimePeriodUtils;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.commons.lang.builder.HashCodeBuilder;
@@ -45,7 +46,7 @@ public class StreamSortSpec implements Serializable {
}
public int getWindowPeriodMillis() {
- if (windowPeriod != null) {
+ if (StringUtils.isNotBlank(windowPeriod)) {
return TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(windowPeriod));
} else {
return 0;
@@ -76,9 +77,9 @@ public class StreamSortSpec implements Serializable {
@Override
public int hashCode() {
return new HashCodeBuilder()
- .append(windowPeriod)
- .append(windowMargin)
- .toHashCode();
+ .append(windowPeriod)
+ .append(windowMargin)
+ .toHashCode();
}
@Override
@@ -92,14 +93,14 @@ public class StreamSortSpec implements Serializable {
StreamSortSpec another = (StreamSortSpec) that;
return
- another.windowPeriod.equals(this.windowPeriod)
- && another.windowMargin == this.windowMargin;
+ another.windowPeriod.equals(this.windowPeriod)
+ && another.windowMargin == this.windowMargin;
}
@Override
public String toString() {
return String.format("StreamSortSpec[windowPeriod=%s,windowMargin=%d]",
- this.getWindowPeriod(),
- this.getWindowMargin());
+ this.getWindowPeriod(),
+ this.getWindowMargin());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
index 51e4532..ecca0ff 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
@@ -19,6 +19,7 @@ package org.apache.eagle.alert.engine.model;
import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import backtype.storm.tuple.Tuple;
import org.apache.commons.lang3.builder.HashCodeBuilder;
+
import java.io.Serializable;
import java.util.Objects;
@@ -63,9 +64,9 @@ public class PartitionedEvent implements Serializable {
if (obj instanceof PartitionedEvent) {
PartitionedEvent another = (PartitionedEvent) obj;
return !(this.partitionKey != another.getPartitionKey()
- || !Objects.equals(this.event, another.getEvent())
- || !Objects.equals(this.partition, another.getPartition())
- || !Objects.equals(this.anchor, another.anchor));
+ || !Objects.equals(this.event, another.getEvent())
+ || !Objects.equals(this.partition, another.getPartition())
+ || !Objects.equals(this.anchor, another.anchor));
} else {
return false;
}
@@ -74,10 +75,11 @@ public class PartitionedEvent implements Serializable {
@Override
public int hashCode() {
return new HashCodeBuilder()
- .append(partitionKey)
- .append(event)
- .append(partition)
- .build();
+ .append(partitionKey)
+ .append(event)
+ .append(partition)
+ .append(anchor)
+ .build();
}
public StreamEvent getEvent() {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
index 8480bc5..130985f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
@@ -95,7 +95,8 @@ public class StreamEvent implements Serializable {
}
if (obj instanceof StreamEvent) {
StreamEvent another = (StreamEvent) obj;
- return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp && Arrays.deepEquals(this.data, another.data);
+ return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp
+ && Arrays.deepEquals(this.data, another.data) && Objects.equals(this.metaVersion, another.metaVersion);
}
return false;
}
@@ -113,10 +114,10 @@ public class StreamEvent implements Serializable {
}
}
return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s],metaVersion=%s]",
- this.getStreamId(),
- DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
- StringUtils.join(dataStrings, ","),
- this.getMetaVersion());
+ this.getStreamId(),
+ DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
+ StringUtils.join(dataStrings, ","),
+ this.getMetaVersion());
}
public static StreamEventBuilder builder() {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
index 5c3f35e..e37e9be 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
@@ -16,6 +16,7 @@
*/
package org.apache.eagle.alert.config;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -69,8 +70,8 @@ public class TestConfigBus {
}
@After
- public void shutdown() {
- CloseableUtils.closeQuietly(server);
+ public void shutdown() throws IOException {
+ server.stop();
producer.close();
consumer.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java
new file mode 100644
index 0000000..a252fae
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class Kafka2TupleMetadataTest {
+ @Test
+ public void testKafka2TupleMetadata() {
+ Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata();
+ kafka2TupleMetadata.setName("setName");
+ kafka2TupleMetadata.setCodec(new Tuple2StreamMetadata());
+ kafka2TupleMetadata.setType("setType");
+ kafka2TupleMetadata.setTopic("setTopic");
+ kafka2TupleMetadata.setSchemeCls("org.apache.eagle.alert.engine.scheme.PlainStringScheme");
+
+ Kafka2TupleMetadata kafka2TupleMetadata1 = new Kafka2TupleMetadata();
+ kafka2TupleMetadata1.setName("setName");
+ kafka2TupleMetadata1.setCodec(new Tuple2StreamMetadata());
+ kafka2TupleMetadata1.setType("setType");
+ kafka2TupleMetadata1.setTopic("setTopic");
+ kafka2TupleMetadata1.setSchemeCls("org.apache.eagle.alert.engine.scheme.PlainStringScheme");
+
+ Assert.assertFalse(kafka2TupleMetadata1 == kafka2TupleMetadata);
+ Assert.assertTrue(kafka2TupleMetadata1.equals(kafka2TupleMetadata));
+ Assert.assertTrue(kafka2TupleMetadata1.hashCode() == kafka2TupleMetadata.hashCode());
+
+ kafka2TupleMetadata1.setType("setType1");
+
+ Assert.assertFalse(kafka2TupleMetadata1.equals(kafka2TupleMetadata));
+ Assert.assertFalse(kafka2TupleMetadata1.hashCode() == kafka2TupleMetadata.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java
new file mode 100644
index 0000000..71f3188
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PolicyWorkerQueueTest {
+ @Test
+ public void testPolicyWorkerQueue() {
+
+ List<WorkSlot> workers = new ArrayList<>();
+ WorkSlot workSlot1 = new WorkSlot("setTopologyName1", "setBoltId1");
+ WorkSlot workSlot2 = new WorkSlot("setTopologyName1", "setBoltId2");
+ workers.add(workSlot1);
+ workers.add(workSlot2);
+ PolicyWorkerQueue policyWorkerQueue = new PolicyWorkerQueue(workers);
+ Assert.assertEquals(null, policyWorkerQueue.getPartition());
+ Assert.assertEquals(workSlot1, policyWorkerQueue.getWorkers().get(0));
+ Assert.assertEquals(workSlot2, policyWorkerQueue.getWorkers().get(1));
+ Assert.assertEquals("[(setTopologyName1:setBoltId1),(setTopologyName1:setBoltId2)]", policyWorkerQueue.toString());
+
+ PolicyWorkerQueue policyWorkerQueue1 = new PolicyWorkerQueue();
+ policyWorkerQueue1.setWorkers(workers);
+
+ Assert.assertTrue(policyWorkerQueue.equals(policyWorkerQueue1));
+ Assert.assertTrue(policyWorkerQueue.hashCode() == policyWorkerQueue1.hashCode());
+
+ StreamSortSpec streamSortSpec = new StreamSortSpec();
+ streamSortSpec.setWindowPeriod("PT10S");
+ StreamPartition streamPartition = new StreamPartition();
+ List<String> columns = new ArrayList<>();
+ columns.add("jobId");
+ streamPartition.setColumns(columns);
+ streamPartition.setSortSpec(streamSortSpec);
+ streamPartition.setStreamId("test");
+ streamPartition.setType(StreamPartition.Type.GROUPBY);
+ policyWorkerQueue1.setPartition(streamPartition);
+
+ Assert.assertFalse(policyWorkerQueue.equals(policyWorkerQueue1));
+ Assert.assertFalse(policyWorkerQueue.hashCode() == policyWorkerQueue1.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java
new file mode 100644
index 0000000..c416a49
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamRepartitionStrategyTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testStreamRepartitionStrategy() {
+ thrown.expect(NullPointerException.class);
+ StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
+ streamRepartitionStrategy.hashCode();
+ }
+
+ @Test
+ public void testStreamRepartitionStrategy1() {
+ thrown.expect(NullPointerException.class);
+ StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
+ streamRepartitionStrategy.equals(streamRepartitionStrategy);
+ }
+
+ @Test
+ public void testStreamRepartitionStrategy2() {
+
+ StreamSortSpec streamSortSpec = new StreamSortSpec();
+ streamSortSpec.setWindowPeriod("PT10S");
+ StreamPartition streamPartition = new StreamPartition();
+ List<String> columns = new ArrayList<>();
+ columns.add("jobId");
+ streamPartition.setColumns(columns);
+ streamPartition.setSortSpec(streamSortSpec);
+ streamPartition.setStreamId("test");
+ streamPartition.setType(StreamPartition.Type.GROUPBY);
+
+
+ StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
+ Assert.assertEquals(null, streamRepartitionStrategy.getPartition());
+ Assert.assertEquals(0, streamRepartitionStrategy.getNumTotalParticipatingRouterBolts());
+ Assert.assertEquals(0, streamRepartitionStrategy.getStartSequence());
+ streamRepartitionStrategy.setPartition(streamPartition);
+ StreamRepartitionStrategy streamRepartitionStrategy1 = new StreamRepartitionStrategy();
+ streamRepartitionStrategy1.setPartition(streamPartition);
+
+ Assert.assertTrue(streamRepartitionStrategy.equals(streamRepartitionStrategy1));
+ Assert.assertTrue(streamRepartitionStrategy.hashCode() == streamRepartitionStrategy1.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java
new file mode 100644
index 0000000..88e72cb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamRouterSpecTest {
+ @Test
+ public void testStreamRouterSpec() {
+ StreamRouterSpec streamRouterSpec = new StreamRouterSpec();
+ Assert.assertEquals(null, streamRouterSpec.getPartition());
+ Assert.assertEquals(null, streamRouterSpec.getStreamId());
+ Assert.assertTrue(streamRouterSpec.getTargetQueue().isEmpty());
+
+ List<WorkSlot> workers = new ArrayList<>();
+ WorkSlot workSlot1 = new WorkSlot("setTopologyName1", "setBoltId1");
+ WorkSlot workSlot2 = new WorkSlot("setTopologyName1", "setBoltId2");
+ workers.add(workSlot1);
+ workers.add(workSlot2);
+ PolicyWorkerQueue policyWorkerQueue = new PolicyWorkerQueue(workers);
+ streamRouterSpec.addQueue(policyWorkerQueue);
+ streamRouterSpec.setStreamId("streamRouterSpec");
+
+ Assert.assertEquals("streamRouterSpec", streamRouterSpec.getStreamId());
+ Assert.assertEquals(1, streamRouterSpec.getTargetQueue().size());
+ Assert.assertEquals(2, streamRouterSpec.getTargetQueue().get(0).getWorkers().size());
+
+ StreamRouterSpec streamRouterSpec1 = new StreamRouterSpec();
+ streamRouterSpec1.addQueue(policyWorkerQueue);
+ streamRouterSpec1.setStreamId("streamRouterSpec1");
+
+ Assert.assertFalse(streamRouterSpec.equals(streamRouterSpec1));
+
+ streamRouterSpec1.setStreamId("streamRouterSpec");
+
+ Assert.assertTrue(streamRouterSpec.equals(streamRouterSpec1));
+ Assert.assertTrue(streamRouterSpec.hashCode() == streamRouterSpec1.hashCode());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java
new file mode 100644
index 0000000..8bbfc41
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class Tuple2StreamMetadataTest {
+ @Test
+ public void testTuple2StreamMetadata() {
+ Tuple2StreamMetadata metadata = new Tuple2StreamMetadata();
+ Set activeStreamNames = new HashSet<>();
+ activeStreamNames.add("defaultStringStream");
+ metadata.setStreamNameSelectorCls("org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector");
+ metadata.setStreamNameSelectorProp(new Properties());
+ metadata.getStreamNameSelectorProp().put("userProvidedStreamName", "defaultStringStream");
+ metadata.setActiveStreamNames(activeStreamNames);
+ metadata.setTimestampColumn("timestamp");
+
+ Tuple2StreamMetadata metadata1 = new Tuple2StreamMetadata();
+ Set activeStreamNames1 = new HashSet<>();
+ activeStreamNames1.add("defaultStringStream");
+ metadata1.setStreamNameSelectorCls("org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector");
+ metadata1.setStreamNameSelectorProp(new Properties());
+ metadata1.getStreamNameSelectorProp().put("userProvidedStreamName", "defaultStringStream");
+ metadata1.setActiveStreamNames(activeStreamNames1);
+ metadata1.setTimestampColumn("timestamp");
+
+ Assert.assertFalse(metadata == metadata1);
+ Assert.assertFalse(metadata.equals(metadata1));
+ Assert.assertFalse(metadata.hashCode() == metadata1.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java
new file mode 100644
index 0000000..48ee73b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class WorkSlotTest {
+ @Test
+ public void testWorkSlot() {
+ WorkSlot workSlot = new WorkSlot();
+ Assert.assertEquals("(null:null)", workSlot.toString());
+ Assert.assertEquals(null, workSlot.getBoltId());
+ Assert.assertEquals(null, workSlot.getTopologyName());
+ workSlot.setBoltId("setBoltId");
+ workSlot.setTopologyName("setTopologyName");
+ Assert.assertEquals("(setTopologyName:setBoltId)", workSlot.toString());
+ Assert.assertEquals("setBoltId", workSlot.getBoltId());
+ Assert.assertEquals("setTopologyName", workSlot.getTopologyName());
+
+ WorkSlot workSlot1 = new WorkSlot("setTopologyName", "setBoltId");
+ Assert.assertEquals("(setTopologyName:setBoltId)", workSlot1.toString());
+ Assert.assertEquals("setBoltId", workSlot1.getBoltId());
+ Assert.assertEquals("setTopologyName", workSlot1.getTopologyName());
+ Assert.assertTrue(workSlot1.equals(workSlot));
+ Assert.assertTrue(workSlot1.hashCode() == workSlot.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java
new file mode 100644
index 0000000..a2c0d6e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class MonitoredStreamTest {
+
+ @Test
+ public void testMonitoredStream() {
+
+ StreamGroup streamGroup = new StreamGroup();
+ StreamSortSpec streamSortSpec = new StreamSortSpec();
+ streamSortSpec.setWindowPeriod("PT10S");
+ StreamPartition streamPartition = new StreamPartition();
+ List<String> columns = new ArrayList<>();
+ columns.add("jobId");
+ streamPartition.setColumns(columns);
+ streamPartition.setSortSpec(streamSortSpec);
+ streamPartition.setStreamId("test");
+ streamPartition.setType(StreamPartition.Type.GROUPBY);
+ streamGroup.addStreamPartition(streamPartition);
+ WorkSlot workSlot = new WorkSlot("setTopologyName", "setBoltId");
+ List<WorkSlot> workSlots = new ArrayList<>();
+ workSlots.add(workSlot);
+ StreamWorkSlotQueue streamWorkSlotQueue = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), workSlots);
+
+ MonitoredStream monitoredStream = new MonitoredStream(streamGroup);
+ Assert.assertEquals(null, monitoredStream.getVersion());
+ Assert.assertTrue(monitoredStream.getQueues().isEmpty());
+ Assert.assertEquals(streamGroup, monitoredStream.getStreamGroup());
+ monitoredStream.addQueues(streamWorkSlotQueue);
+ Assert.assertEquals(streamWorkSlotQueue, monitoredStream.getQueues().get(0));
+
+ MonitoredStream monitoredStream1 = new MonitoredStream(streamGroup);
+ Assert.assertTrue(monitoredStream.equals(monitoredStream1));
+ Assert.assertTrue(monitoredStream.hashCode() == monitoredStream1.hashCode());
+
+ monitoredStream.removeQueue(streamWorkSlotQueue);
+ Assert.assertTrue(monitoredStream.getQueues().isEmpty());
+
+ Assert.assertTrue(monitoredStream.equals(monitoredStream1));
+ Assert.assertTrue(monitoredStream.hashCode() == monitoredStream1.hashCode());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java
new file mode 100644
index 0000000..1491c77
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PolicyAssignmentTest {
+ @Test
+ public void testPolicyAssignment() {
+ PolicyAssignment policyAssignment = new PolicyAssignment("policy", "queue");
+ Assert.assertEquals("policy", policyAssignment.getPolicyName());
+ Assert.assertEquals("queue", policyAssignment.getQueueId());
+ Assert.assertEquals(null, policyAssignment.getVersion());
+ Assert.assertEquals("PolicyAssignment of policy policy, queueId queue, version null !", policyAssignment.toString());
+
+ Assert.assertFalse(policyAssignment.equals(new PolicyAssignment("policy", "queue")));
+ Assert.assertFalse(policyAssignment == new PolicyAssignment("policy", "queue"));
+ Assert.assertFalse(policyAssignment.hashCode() == new PolicyAssignment("policy", "queue").hashCode());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
new file mode 100644
index 0000000..d0f0189
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamGroupTest {
+ @Test
+ public void testStreamGroup() {
+ StreamGroup streamGroup = new StreamGroup();
+ Assert.assertEquals("StreamGroup partitions=: [] ", streamGroup.toString());
+ Assert.assertEquals("SG[]", streamGroup.getStreamId());
+
+ StreamSortSpec streamSortSpec = new StreamSortSpec();
+ streamSortSpec.setWindowPeriod("PT10S");
+ StreamPartition streamPartition = new StreamPartition();
+ List<String> columns = new ArrayList<>();
+ columns.add("jobId");
+ streamPartition.setColumns(columns);
+ streamPartition.setSortSpec(streamSortSpec);
+ streamPartition.setStreamId("test");
+ streamPartition.setType(StreamPartition.Type.GROUPBY);
+ streamGroup.addStreamPartition(streamPartition);
+ Assert.assertEquals("SG[test-]", streamGroup.getStreamId());
+ Assert.assertEquals("StreamGroup partitions=: [StreamPartition[streamId=test,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]]] ", streamGroup.toString());
+
+ List<StreamPartition> streamPartitions = new ArrayList<>();
+ streamPartition.setStreamId("test1");
+ streamPartitions.add(streamPartition);
+ streamGroup.addStreamPartitions(streamPartitions);
+ Assert.assertEquals("SG[test1-test1-]", streamGroup.getStreamId());
+
+
+ streamPartitions = new ArrayList<>();
+ StreamPartition streamPartition1 = new StreamPartition();
+ streamPartition1.setStreamId("test2");
+ streamPartitions.add(streamPartition1);
+ streamGroup.addStreamPartitions(streamPartitions);
+ Assert.assertEquals("SG[test1-test1-test2-]", streamGroup.getStreamId());
+ Assert.assertEquals("StreamGroup partitions=: [StreamPartition[streamId=test1,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]], StreamPartition[streamId=test1,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]], StreamPartition[streamId=test2,type=null,columns=[],sortSpec=[null]]] ", streamGroup.toString());
+
+ StreamGroup streamGroup1 = new StreamGroup();
+ streamGroup1.addStreamPartitions(streamGroup.getStreamPartitions());
+ Assert.assertTrue(streamGroup.equals(streamGroup1));
+ Assert.assertTrue(streamGroup.hashCode() == streamGroup1.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java
new file mode 100644
index 0000000..bc2f74e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class StreamWorkSlotQueueTest {
+ @Test
+ public void testStreamWorkSlotQueue() {
+ StreamGroup streamGroup = new StreamGroup();
+ StreamSortSpec streamSortSpec = new StreamSortSpec();
+ streamSortSpec.setWindowPeriod("PT10S");
+ StreamPartition streamPartition = new StreamPartition();
+ List<String> columns = new ArrayList<>();
+ columns.add("jobId");
+ streamPartition.setColumns(columns);
+ streamPartition.setSortSpec(streamSortSpec);
+ streamPartition.setStreamId("test");
+ streamPartition.setType(StreamPartition.Type.GROUPBY);
+ streamGroup.addStreamPartition(streamPartition);
+ WorkSlot workSlot = new WorkSlot("setTopologyName", "setBoltId");
+ List<WorkSlot> workSlots = new ArrayList<>();
+ workSlots.add(workSlot);
+ StreamWorkSlotQueue streamWorkSlotQueue = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), workSlots);
+
+ Assert.assertTrue(streamWorkSlotQueue.getQueueId().startsWith("SG[test-]"));
+ Assert.assertTrue(streamWorkSlotQueue.getDedicateOption().isEmpty());
+ Assert.assertEquals(0, streamWorkSlotQueue.getNumberOfGroupBolts());
+ Assert.assertEquals(1, streamWorkSlotQueue.getQueueSize());
+ Assert.assertTrue(streamWorkSlotQueue.getTopoGroupStartIndex().isEmpty());
+ Assert.assertEquals(-1, streamWorkSlotQueue.getTopologyGroupStartIndex(""));
+ Assert.assertEquals(workSlot, streamWorkSlotQueue.getWorkingSlots().get(0));
+
+ StreamWorkSlotQueue streamWorkSlotQueue1 = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), workSlots);
+ Assert.assertFalse(streamWorkSlotQueue.equals(streamWorkSlotQueue1));
+ Assert.assertFalse(streamWorkSlotQueue == streamWorkSlotQueue1);
+ Assert.assertFalse(streamWorkSlotQueue.hashCode() == streamWorkSlotQueue1.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java
new file mode 100644
index 0000000..760657a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TopologyTest {
+ @Test
+ public void testTopology() {
+ Topology topology = new Topology("test", 2, 3);
+ Assert.assertEquals(null, topology.getClusterName());
+ Assert.assertEquals("test", topology.getName());
+ Assert.assertEquals(null, topology.getPubBoltId());
+ Assert.assertEquals(null, topology.getSpoutId());
+ Assert.assertEquals(0, topology.getAlertBoltIds().size());
+ Assert.assertEquals(1, topology.getAlertParallelism());
+ Assert.assertEquals(0, topology.getGroupNodeIds().size());
+ Assert.assertEquals(1, topology.getGroupParallelism());
+ Assert.assertEquals(3, topology.getNumOfAlertBolt());
+ Assert.assertEquals(2, topology.getNumOfGroupBolt());
+ Assert.assertEquals(0, topology.getNumOfPublishBolt());
+ Assert.assertEquals(1, topology.getNumOfSpout());
+ Assert.assertEquals(1, topology.getSpoutParallelism());
+
+ Topology topology1 = new Topology("test", 2, 3);
+
+ Assert.assertFalse(topology1.equals(topology));
+ Assert.assertFalse(topology1.hashCode() == topology.hashCode());
+ Assert.assertFalse(topology1 == topology);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java
new file mode 100644
index 0000000..cc84c56
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class OverrideDeduplicatorSpecTest {
+
+ @Test
+ public void testOverrideDeduplicatorSpec() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("kafka_broker", "localhost:9092");
+ properties.put("topic", "TEST_TOPIC_NAME");
+ OverrideDeduplicatorSpec overrideDeduplicatorSpec = new OverrideDeduplicatorSpec();
+ overrideDeduplicatorSpec.setClassName("testClass");
+ overrideDeduplicatorSpec.setProperties(properties);
+
+ OverrideDeduplicatorSpec overrideDeduplicatorSpec1 = new OverrideDeduplicatorSpec();
+ overrideDeduplicatorSpec1.setClassName("testClass");
+ overrideDeduplicatorSpec1.setProperties(properties);
+
+ Assert.assertFalse(overrideDeduplicatorSpec1 == overrideDeduplicatorSpec);
+ Assert.assertTrue(overrideDeduplicatorSpec1.equals(overrideDeduplicatorSpec));
+ Assert.assertTrue(overrideDeduplicatorSpec1.hashCode() == overrideDeduplicatorSpec.hashCode());
+
+ overrideDeduplicatorSpec1.setClassName("testClass1");
+
+ Assert.assertFalse(overrideDeduplicatorSpec1 == overrideDeduplicatorSpec);
+ Assert.assertFalse(overrideDeduplicatorSpec1.equals(overrideDeduplicatorSpec));
+ Assert.assertFalse(overrideDeduplicatorSpec1.hashCode() == overrideDeduplicatorSpec.hashCode());
+
+ overrideDeduplicatorSpec1.setClassName("testClass");
+ Map<String, String> properties1 = new HashMap<>();
+ properties.put("kafka_broker", "localhost:9092");
+ overrideDeduplicatorSpec1.setProperties(properties1);
+
+ Assert.assertFalse(overrideDeduplicatorSpec1 == overrideDeduplicatorSpec);
+ Assert.assertFalse(overrideDeduplicatorSpec1.equals(overrideDeduplicatorSpec));
+ Assert.assertFalse(overrideDeduplicatorSpec1.hashCode() == overrideDeduplicatorSpec.hashCode());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
new file mode 100644
index 0000000..7acb4f7
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+public class PolicyDefinitionTest {
+
+ @Test
+ public void testPolicyInnerDefinition() {
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ def.setValue("test");
+ def.setType("siddhi");
+ def.setHandlerClass("setHandlerClass");
+ def.setProperties(new HashMap<>());
+ def.setOutputStreams(Arrays.asList("outputStream"));
+ def.setInputStreams(Arrays.asList("inputStream"));
+ Assert.assertEquals("{type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }", def.toString());
+
+ PolicyDefinition.Definition def1 = new PolicyDefinition.Definition();
+ def1.setValue("test");
+ def1.setType("siddhi");
+ def1.setHandlerClass("setHandlerClass");
+ def1.setProperties(new HashMap<>());
+ def1.setOutputStreams(Arrays.asList("outputStream"));
+ def1.setInputStreams(Arrays.asList("inputStream"));
+
+ Assert.assertFalse(def == def1);
+ Assert.assertTrue(def.equals(def1));
+ Assert.assertTrue(def.hashCode() == def1.hashCode());
+
+ def1.setInputStreams(Arrays.asList("inputStream1"));
+
+ Assert.assertFalse(def.equals(def1));
+ Assert.assertTrue(def.hashCode() == def1.hashCode());//problem equals() and hashCode() be inconsistent
+
+ }
+
+ @Test
+ public void testPolicyDefinition() {
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ def.setValue("test");
+ def.setType("siddhi");
+ def.setHandlerClass("setHandlerClass");
+ def.setProperties(new HashMap<>());
+ def.setOutputStreams(Arrays.asList("outputStream"));
+ def.setInputStreams(Arrays.asList("inputStream"));
+ pd.setDefinition(def);
+ pd.setInputStreams(Arrays.asList("inputStream"));//confuse with PolicyDefinition.Definition InputStreams
+ pd.setOutputStreams(Arrays.asList("outputStream"));//confuse with PolicyDefinition.Definition OutputStreams
+ pd.setName("policyName");
+ pd.setDescription(String.format("Test policy for stream %s", "streamName"));
+
+ StreamPartition sp = new StreamPartition();
+ sp.setStreamId("streamName");
+ sp.setColumns(Arrays.asList("host"));
+ sp.setType(StreamPartition.Type.GROUPBY);
+ pd.addPartition(sp);
+ Assert.assertEquals("{name=\"policyName\",definition={type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }}", pd.toString());
+
+ PolicyDefinition pd1 = new PolicyDefinition();
+ PolicyDefinition.Definition def1 = new PolicyDefinition.Definition();
+ def1.setValue("test");
+ def1.setType("siddhi");
+ def1.setHandlerClass("setHandlerClass");
+ def1.setProperties(new HashMap<>());
+ def1.setOutputStreams(Arrays.asList("outputStream"));
+ def1.setInputStreams(Arrays.asList("inputStream"));
+ pd1.setDefinition(def1);
+ pd1.setInputStreams(Arrays.asList("inputStream"));//confuse with PolicyDefinition.Definition InputStreams
+ pd1.setOutputStreams(Arrays.asList("outputStream"));//confuse with PolicyDefinition.Definition OutputStreams
+ pd1.setName("policyName");
+ pd1.setDescription(String.format("Test policy for stream %s", "streamName"));
+
+ StreamPartition sp1 = new StreamPartition();
+ sp1.setStreamId("streamName");
+ sp1.setColumns(Arrays.asList("host"));
+ sp1.setType(StreamPartition.Type.GROUPBY);
+ pd1.addPartition(sp1);
+
+
+ Assert.assertFalse(pd == pd1);
+ Assert.assertTrue(pd.equals(pd1));
+ Assert.assertTrue(pd.hashCode() == pd1.hashCode());
+ sp1.setStreamId("streamName1");
+
+ Assert.assertFalse(pd == pd1);
+ Assert.assertFalse(pd.equals(pd1));
+ Assert.assertFalse(pd.hashCode() == pd1.hashCode());
+
+ sp1.setStreamId("streamName");
+ def1.setOutputStreams(Arrays.asList("outputStream1"));
+
+ Assert.assertFalse(pd == pd1);
+ Assert.assertFalse(pd.equals(pd1));
+
+ Assert.assertTrue(pd.hashCode() == pd1.hashCode());//problem equals() and hashCode() be inconsistent
+
+ }
+
+ @Test
+ public void testPolicyDefinitionEqualByPolicyStatus() {
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ PolicyDefinition policy1 = new PolicyDefinition();
+ policy1.setName("policy1");
+ policy1.setDefinition(definition);
+
+ PolicyDefinition policy2 = new PolicyDefinition();
+ policy2.setName("policy1");
+ policy2.setPolicyStatus(PolicyDefinition.PolicyStatus.DISABLED);
+ policy2.setDefinition(definition);
+
+ PolicyDefinition policy3 = new PolicyDefinition();
+ policy3.setName("policy1");
+ policy3.setDefinition(definition);
+
+ Assert.assertTrue(policy1.equals(policy3));
+ Assert.assertFalse(policy1.equals(policy2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java
new file mode 100644
index 0000000..494d8ca
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class PublishmentTest {
+ @Test
+ public void testPublishment() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("kafka_broker", "localhost:9092");
+ properties.put("topic", "TEST_TOPIC_NAME");
+
+ List<Map<String, Object>> kafkaClientConfig = new ArrayList<>();
+ kafkaClientConfig.add(ImmutableMap.of("name", "producer.type", "value", "sync"));
+ properties.put("kafka_client_config", kafkaClientConfig);
+
+ PolicyDefinition policy = createPolicy("testStream", "testPolicy");
+ Publishment publishment = new Publishment();
+ publishment.setName("testAsyncPublishment");
+ publishment.setType("org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher");
+ publishment.setPolicyIds(Arrays.asList(policy.getName()));
+ publishment.setDedupIntervalMin("PT0M");
+ OverrideDeduplicatorSpec overrideDeduplicatorSpec = new OverrideDeduplicatorSpec();
+ overrideDeduplicatorSpec.setClassName("testClass");
+ publishment.setOverrideDeduplicator(overrideDeduplicatorSpec);
+ publishment.setSerializer("org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer");
+ publishment.setProperties(properties);
+
+ Assert.assertEquals("Publishment[name:testAsyncPublishment,type:org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher,policyId:[testPolicy],properties:{kafka_client_config=[{name=producer.type, value=sync}], topic=TEST_TOPIC_NAME, kafka_broker=localhost:9092}", publishment.toString());
+
+
+ Publishment publishment1 = new Publishment();
+ publishment1.setName("testAsyncPublishment");
+ publishment1.setType("org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher");
+ publishment1.setPolicyIds(Arrays.asList(policy.getName()));
+ publishment1.setDedupIntervalMin("PT0M");
+ OverrideDeduplicatorSpec overrideDeduplicatorSpec1 = new OverrideDeduplicatorSpec();
+ overrideDeduplicatorSpec1.setClassName("testClass");
+ publishment1.setOverrideDeduplicator(overrideDeduplicatorSpec1);
+ publishment1.setSerializer("org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer");
+ publishment1.setProperties(properties);
+
+ Assert.assertTrue(publishment.equals(publishment1));
+ Assert.assertTrue(publishment.hashCode() == publishment1.hashCode());
+ Assert.assertFalse(publishment == publishment1);
+ publishment1.getOverrideDeduplicator().setClassName("testClass1");
+
+
+ Assert.assertFalse(publishment.equals(publishment1));
+ Assert.assertFalse(publishment.hashCode() == publishment1.hashCode());
+ Assert.assertFalse(publishment == publishment1);
+
+ publishment1.getOverrideDeduplicator().setClassName("testClass");
+ publishment1.setStreamIds(Arrays.asList("streamid1,streamid2"));
+ Assert.assertFalse(publishment.equals(publishment1));
+ Assert.assertFalse(publishment.hashCode() == publishment1.hashCode());
+ Assert.assertFalse(publishment == publishment1);
+ }
+
+ private PolicyDefinition createPolicy(String streamName, String policyName) {
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ // expression, something like "PT5S,dynamic,1,host"
+ def.setValue("test");
+ def.setType("siddhi");
+ pd.setDefinition(def);
+ pd.setInputStreams(Arrays.asList("inputStream"));
+ pd.setOutputStreams(Arrays.asList("outputStream"));
+ pd.setName(policyName);
+ pd.setDescription(String.format("Test policy for stream %s", streamName));
+
+ StreamPartition sp = new StreamPartition();
+ sp.setStreamId(streamName);
+ sp.setColumns(Arrays.asList("host"));
+ sp.setType(StreamPartition.Type.GROUPBY);
+ pd.addPartition(sp);
+ return pd;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
new file mode 100644
index 0000000..91f9cf8
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PublishmentTypeTest {
+ @Test
+ public void testPublishmentType() {
+ PublishmentType publishmentType = new PublishmentType();
+ publishmentType.setType("KAFKA");
+ publishmentType.setClassName("setClassName");
+ publishmentType.setDescription("setDescription");
+ publishmentType.setFields("setFields");
+
+ PublishmentType publishmentType1 = new PublishmentType();
+ publishmentType1.setType("KAFKA");
+ publishmentType1.setClassName("setClassName");
+ publishmentType1.setDescription("setDescription");
+ publishmentType1.setFields("setFields");
+
+ Assert.assertFalse(publishmentType.equals(new String("")));
+ Assert.assertFalse(publishmentType == publishmentType1);
+ Assert.assertTrue(publishmentType.equals(publishmentType1));
+ Assert.assertTrue(publishmentType.hashCode() == publishmentType1.hashCode());
+
+ publishmentType1.setType("JMS");
+
+ Assert.assertFalse(publishmentType.equals(publishmentType1));
+ Assert.assertFalse(publishmentType.hashCode() == publishmentType1.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java
new file mode 100644
index 0000000..ccc7717
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+
+
+public class StreamColumnTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testStreamStringColumn() {
+ StreamColumn streamColumn = new StreamColumn.Builder().name("NAMEyhd").type(StreamColumn.Type.STRING).defaultValue("EAGLEyhd").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,NAMEyhd");
+ Assert.assertEquals("StreamColumn=name[NAMEyhd], type=[string], defaultValue=[EAGLEyhd], required=[true], nodataExpression=[PT1M,dynamic,1,NAMEyhd]", streamColumn.toString());
+ Assert.assertTrue(streamColumn.getDefaultValue() instanceof String);
+ }
+
+ @Test
+ public void testStreamLongColumn() {
+ thrown.expect(NumberFormatException.class);
+ new StreamColumn.Builder().name("salary").type(StreamColumn.Type.LONG).defaultValue("eagle").required(true).build();
+ }
+
+ @Test
+ public void testStreamLongColumn1() {
+ StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.LONG).defaultValue("0").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[long], defaultValue=[0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+ Assert.assertTrue(streamColumn.getDefaultValue() instanceof Long);
+ }
+
+ @Test
+ public void testStreamDoubleColumn() {
+ thrown.expect(NumberFormatException.class);
+ new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("eagle").required(true).build();
+ }
+
+ @Test
+ public void testStreamDoubleColumn1() {
+ StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("0.1").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[double], defaultValue=[0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+ streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("-0.1").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[double], defaultValue=[-0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+ streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("1").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[double], defaultValue=[1.0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+ Assert.assertTrue(streamColumn.getDefaultValue() instanceof Double);
+ }
+
+ @Test
+ public void testStreamFloatColumn() {
+ thrown.expect(NumberFormatException.class);
+ new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("eagle").required(true).build();
+ }
+
+ @Test
+ public void testStreamFloatColumn1() {
+ StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("0.1").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[float], defaultValue=[0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+ streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("-0.1").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[float], defaultValue=[-0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+ streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("1").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[float], defaultValue=[1.0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+ Assert.assertTrue(streamColumn.getDefaultValue() instanceof Float);
+ }
+
+ @Test
+ public void testStreamIntColumn() {
+ thrown.expect(NumberFormatException.class);
+ new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("eagle").required(true).build();
+ }
+
+ @Test
+ public void testStreamIntColumn1() {
+ thrown.expect(NumberFormatException.class);
+ new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("0.1").required(true).build();
+ }
+
+
+ @Test
+ public void testStreamIntColumn2() {
+ StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("1").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[int], defaultValue=[1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+ streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("0").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[int], defaultValue=[0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+ Assert.assertTrue(streamColumn.getDefaultValue() instanceof Integer);
+ }
+
+ @Test
+ public void testStreamBoolColumn() {
+ StreamColumn streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("eagle").required(false).build();
+ streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+ Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[false], required=[false], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+ streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("1").required(true).build();
+ streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+ Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[false], required=[true], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+ streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("0").required(true).build();
+ streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+ Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[false], required=[true], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+ streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("True").required(true).build();
+ streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+ Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[true], required=[true], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+ Assert.assertTrue(streamBoolColumn.getDefaultValue() instanceof Boolean);
+ }
+
+ @Test
+ public void testStreamObjectColumn() {
+ thrown.expect(IllegalArgumentException.class);
+ new StreamColumn.Builder().name("name").type(StreamColumn.Type.OBJECT).defaultValue("eagle").required(true).build();
+ }
+
+ @Test
+ public void testStreamObjectColumn1() {
+ StreamColumn streamColumn = new StreamColumn.Builder().name("name").type(StreamColumn.Type.OBJECT).defaultValue("{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,name");
+ Assert.assertEquals("StreamColumn=name[name], type=[object], defaultValue=[{name=heap.COMMITTED, Value=175636480}], required=[true], nodataExpression=[PT1M,dynamic,1,name]", streamColumn.toString());
+ Assert.assertTrue(streamColumn.getDefaultValue() instanceof HashMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
new file mode 100644
index 0000000..b5015cd
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamDefinitionTest {
+ @Test
+ public void testStreamDefinition() {
+
+ List<StreamColumn> streamColumns = new ArrayList<>();
+ streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+ streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+ streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+
+ StreamDefinition streamDefinition = new StreamDefinition();
+ Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[]", streamDefinition.toString());
+ streamDefinition.setColumns(streamColumns);
+
+ Assert.assertEquals(3, streamDefinition.getColumnIndex("data"));
+ Assert.assertEquals(-1, streamDefinition.getColumnIndex("DATA"));
+ Assert.assertEquals(-1, streamDefinition.getColumnIndex("isYhd"));
+ Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition.toString());
+ StreamDefinition streamDefinition1 = streamDefinition.copy();
+ Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition1.toString());
+
+ Assert.assertFalse(streamDefinition1.equals(streamDefinition));
+ Assert.assertFalse(streamDefinition1 == streamDefinition);
+ Assert.assertFalse(streamDefinition1.hashCode() == streamDefinition.hashCode());
+ }
+}