You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/11/09 13:56:05 UTC
[1/2] incubator-eagle git commit: [EAGLE-749] Add unit test for model.
Repository: incubator-eagle
Updated Branches:
refs/heads/master 1da8dc4f1 -> 2b61cef58
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamPartitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamPartitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamPartitionTest.java
new file mode 100644
index 0000000..f7615df
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamPartitionTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamPartitionTest {
+ @Test
+ public void testStreamPartition() {
+ StreamSortSpec streamSortSpec = new StreamSortSpec();
+ streamSortSpec.setWindowPeriod("PT10S");
+ StreamPartition streamPartition = new StreamPartition();
+ Assert.assertEquals("StreamPartition[streamId=null,type=null,columns=[],sortSpec=[null]]", streamPartition.toString());
+ List<String> columns = new ArrayList<>();
+ columns.add("jobId");
+ streamPartition.setColumns(columns);
+ streamPartition.setSortSpec(streamSortSpec);
+ streamPartition.setStreamId("test");
+ streamPartition.setType(StreamPartition.Type.GROUPBY);
+ Assert.assertEquals("StreamPartition[streamId=test,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]]", streamPartition.toString());
+ Assert.assertTrue(streamPartition.equals(new StreamPartition(streamPartition)));
+ Assert.assertTrue(streamPartition.hashCode() == new StreamPartition(streamPartition).hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpecTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpecTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpecTest.java
new file mode 100644
index 0000000..d8123f1
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpecTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StreamSortSpecTest {
+ @Test
+ public void testStreamSortSpec() {
+ StreamSortSpec streamSortSpec = new StreamSortSpec();
+ Assert.assertEquals(30000, streamSortSpec.getWindowMargin());
+ Assert.assertEquals("", streamSortSpec.getWindowPeriod());
+ Assert.assertEquals(0, streamSortSpec.getWindowPeriodMillis());
+ Assert.assertEquals("StreamSortSpec[windowPeriod=,windowMargin=30000]", streamSortSpec.toString());
+ streamSortSpec.setWindowPeriod("PT60S");
+ Assert.assertEquals(60000, streamSortSpec.getWindowPeriodMillis());
+ Assert.assertEquals("PT60S", streamSortSpec.getWindowPeriod());
+ Assert.assertEquals("StreamSortSpec[windowPeriod=PT60S,windowMargin=30000]", streamSortSpec.toString());
+ streamSortSpec.setWindowMargin(20);
+ Assert.assertEquals(20, streamSortSpec.getWindowMargin());
+ streamSortSpec.setWindowPeriodMillis(50000);
+ Assert.assertEquals("StreamSortSpec[windowPeriod=PT50S,windowMargin=20]", streamSortSpec.toString());
+ streamSortSpec.setWindowPeriod2(Period.minutes(10));
+ Assert.assertEquals("StreamSortSpec[windowPeriod=PT10M,windowMargin=20]", streamSortSpec.toString());
+ Assert.assertTrue(streamSortSpec.equals(new StreamSortSpec(streamSortSpec)));
+ Assert.assertTrue(streamSortSpec.hashCode() == new StreamSortSpec(streamSortSpec).hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamingClusterTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamingClusterTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamingClusterTest.java
new file mode 100644
index 0000000..2b79cf6
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamingClusterTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class StreamingClusterTest {
+ @Test
+ public void testStreamingCluster() {
+ StreamingCluster cluster = new StreamingCluster();
+ cluster.setName("test");
+ cluster.setDeployments(new HashMap<>());
+ cluster.setDescription("setDescription");
+ cluster.setType(StreamingCluster.StreamingType.STORM);
+ cluster.setZone("setZone");
+
+ StreamingCluster cluster1 = new StreamingCluster();
+ cluster1.setName("test");
+ cluster1.setDeployments(new HashMap<>());
+ cluster1.setDescription("setDescription");
+ cluster1.setType(StreamingCluster.StreamingType.STORM);
+ cluster1.setZone("setZone");
+
+
+ Assert.assertFalse(cluster == cluster1);
+ Assert.assertFalse(cluster.equals(cluster1));
+ Assert.assertFalse(cluster.hashCode() == cluster1.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
new file mode 100644
index 0000000..d534e3b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.*;
+
+public class AlertPublishEventTest {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testAlertPublishEvent() {
+ thrown.expect(NullPointerException.class);
+ AlertStreamEvent alertStreamEvent = new AlertStreamEvent();
+ AlertPublishEvent.createAlertPublishEvent(alertStreamEvent);
+ }
+
+ @Test
+ public void testAlertPublishEvent1() {
+ thrown.expect(NullPointerException.class);
+ List<StreamColumn> streamColumns = new ArrayList<>();
+ streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+ streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+ streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+ streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+ streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+ streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setColumns(streamColumns);
+ AlertStreamEvent alertStreamEvent = new AlertStreamEvent();
+ alertStreamEvent.setSchema(streamDefinition);
+ AlertPublishEvent.createAlertPublishEvent(alertStreamEvent);
+ }
+
+ @Test
+ public void testAlertPublishEvent2() {
+ List<StreamColumn> streamColumns = new ArrayList<>();
+ streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+ streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+ streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+ streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+ streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+ streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setColumns(streamColumns);
+ AlertStreamEvent alertStreamEvent = new AlertStreamEvent();
+ alertStreamEvent.setData(new Object[]{"namevalue", "hostvalue", "1", 10, 0.1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", 1});
+ alertStreamEvent.setSchema(streamDefinition);
+ alertStreamEvent.setPolicyId("setPolicyId");
+ alertStreamEvent.setCreatedTime(1234);
+ AlertPublishEvent alertPublishEvent = AlertPublishEvent.createAlertPublishEvent(alertStreamEvent);
+ Assert.assertEquals(null, alertPublishEvent.getSiteId());
+ Assert.assertTrue(alertPublishEvent.getAlertId() != null);
+ Assert.assertEquals("setPolicyId", alertPublishEvent.getPolicyId());
+ Assert.assertEquals(null, alertPublishEvent.getPolicyValue());
+ Assert.assertEquals("{flag=1, data=0.1, name=namevalue, host=hostvalue, salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}", alertPublishEvent.getAlertData().toString());
+ Assert.assertEquals(1234, alertPublishEvent.getAlertTimestamp());
+ Assert.assertEquals(null, alertPublishEvent.getAppIds());
+
+ AlertPublishEvent alertPublishEvent1 = AlertPublishEvent.createAlertPublishEvent(alertStreamEvent);
+ Assert.assertFalse(alertPublishEvent1 == alertPublishEvent);
+ Assert.assertFalse(alertPublishEvent1.equals(alertPublishEvent));
+ Assert.assertFalse(alertPublishEvent1.hashCode() == alertPublishEvent.hashCode());
+
+ Map<String, Object> extraData = new HashMap<>();
+ extraData.put(AlertPublishEvent.SITE_ID_KEY, "SITE_ID_KEY");
+ extraData.put(AlertPublishEvent.POLICY_VALUE_KEY, "POLICY_VALUE_KEY");
+ extraData.put(AlertPublishEvent.APP_IDS_KEY, Arrays.asList("appId1", "appId2"));
+ alertStreamEvent.setExtraData(extraData);
+
+ alertPublishEvent = AlertPublishEvent.createAlertPublishEvent(alertStreamEvent);
+ Assert.assertEquals("SITE_ID_KEY", alertPublishEvent.getSiteId());
+ Assert.assertTrue(alertPublishEvent.getAlertId() != null);
+ Assert.assertEquals("setPolicyId", alertPublishEvent.getPolicyId());
+ Assert.assertEquals("POLICY_VALUE_KEY", alertPublishEvent.getPolicyValue());
+ Assert.assertEquals("{flag=1, data=0.1, name=namevalue, host=hostvalue, salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}", alertPublishEvent.getAlertData().toString());
+ Assert.assertEquals(1234, alertPublishEvent.getAlertTimestamp());
+ Assert.assertEquals("appId1", alertPublishEvent.getAppIds().get(0));
+ Assert.assertEquals("appId2", alertPublishEvent.getAppIds().get(1));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java
new file mode 100644
index 0000000..eec3675
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class AlertStreamEventTest {
+
+ @Test
+ public void testAlertStreamEvent() {
+ List<StreamColumn> streamColumns = new ArrayList<>();
+ streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+ streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+ streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+ streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+ streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+ streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setColumns(streamColumns);
+ AlertStreamEvent alertStreamEvent = new AlertStreamEvent();
+ alertStreamEvent.setSchema(streamDefinition);
+ alertStreamEvent.setData(new Object[]{"namevalue", "hostvalue", "1", 10, 0.1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", 1});
+ Assert.assertEquals("AlertStreamEvent[stream=NULL,timestamp=1970-01-01 00:00:00,000,data=[namevalue,hostvalue,1,10,0.1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},1], policyId=null, createdBy=null, metaVersion=null]", alertStreamEvent.toString());
+ Assert.assertEquals("{flag=1, data=0.1, name=namevalue, host=hostvalue, salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}", alertStreamEvent.getDataMap().toString());
+
+ AlertStreamEvent alertStreamEvent1 = new AlertStreamEvent(alertStreamEvent);
+
+ Assert.assertEquals("AlertStreamEvent[stream=NULL,timestamp=1970-01-01 00:00:00,000,data=[namevalue,hostvalue,1,10,0.1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},1], policyId=null, createdBy=null, metaVersion=null]", alertStreamEvent1.toString());
+ Assert.assertEquals("{flag=1, data=0.1, name=namevalue, host=hostvalue, salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}", alertStreamEvent1.getDataMap().toString());
+
+
+ Assert.assertFalse(alertStreamEvent1 == alertStreamEvent);
+ Assert.assertTrue(alertStreamEvent1.equals(alertStreamEvent));
+ Assert.assertTrue(alertStreamEvent1.hashCode() == alertStreamEvent.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/PartitionedEventTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/PartitionedEventTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/PartitionedEventTest.java
new file mode 100644
index 0000000..719445b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/PartitionedEventTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PartitionedEventTest {
+ @Test
+ public void testPartitionedEvent() {
+ PartitionedEvent partitionedEvent = new PartitionedEvent();
+ Assert.assertEquals("PartitionedEvent[partition=null,event=null,key=0", partitionedEvent.toString());
+
+ Object[] data = new Object[]{"namevalue", "hostvalue", "1", 10, 0.1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", 1};
+ StreamEvent streamEvent = new StreamEvent("streamId", 1478667686971l, data);
+
+ StreamSortSpec streamSortSpec = new StreamSortSpec();
+ streamSortSpec.setWindowPeriod("PT10S");
+ StreamPartition streamPartition = new StreamPartition();
+ List<String> columns = new ArrayList<>();
+ columns.add("jobId");
+ streamPartition.setColumns(columns);
+ streamPartition.setSortSpec(streamSortSpec);
+ streamPartition.setStreamId("test");
+ streamPartition.setType(StreamPartition.Type.GROUPBY);
+
+ partitionedEvent = new PartitionedEvent(streamEvent, streamPartition, 1);
+ Assert.assertEquals("PartitionedEvent[partition=StreamPartition[streamId=test,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]],event=StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,hostvalue,1,10,0.1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},1],metaVersion=null],key=1", partitionedEvent.toString());
+ PartitionedEvent partitionedEventCopy = partitionedEvent.copy();
+ Assert.assertFalse(partitionedEventCopy == partitionedEvent);
+ Assert.assertTrue(partitionedEventCopy.equals(partitionedEvent));
+ Assert.assertTrue(partitionedEventCopy.hashCode() == partitionedEvent.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventBuilderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventBuilderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventBuilderTest.java
new file mode 100644
index 0000000..16c7ad2
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventBuilderTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class StreamEventBuilderTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testStreamEventBuilder() {
+
+ List<StreamColumn> streamColumns = new ArrayList<>();
+ streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+ streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+ streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+ streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+ streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+ streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setColumns(streamColumns);
+
+ StreamEventBuilder streamEventBuilder = new StreamEventBuilder();
+ StreamEvent streamEvent = streamEventBuilder.schema(streamDefinition).streamId("streamId").metaVersion("metaVersion").timestamep(1478667686971l).build();
+ Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[],metaVersion=metaVersion]", streamEvent.toString());
+ Object[] data = new Object[]{"namevalue", "hostvalue", "1", 10.0, 1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", 4};
+ streamEvent = streamEventBuilder.schema(streamDefinition).attributes(data).streamId("streamId").metaVersion("metaVersion").timestamep(1478667686971l).build();
+ Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,hostvalue,1,10.0,1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},4],metaVersion=metaVersion]", streamEvent.toString());
+
+ Map<String, Object> mapdata = new HashMap<>();
+ mapdata.put("name", "namevalue");
+ mapdata.put("host", "hostvalue");
+ mapdata.put("flag", "1");
+ mapdata.put("value", 10.0);
+ mapdata.put("data", 1);
+ mapdata.put("salary", -0.2);
+ mapdata.put("object", "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}");
+ mapdata.put("int", 4);
+ StreamEvent streamEvent1 = streamEventBuilder.schema(streamDefinition).attributes(mapdata, streamDefinition).streamId("streamId").metaVersion("metaVersion").timestamep(1478667686971l).build();
+ Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,hostvalue,1,10.0,1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},4],metaVersion=metaVersion]", streamEvent.toString());
+
+ Assert.assertTrue(streamEvent1 == streamEvent);
+ Assert.assertTrue(streamEvent1.equals(streamEvent));
+ Assert.assertTrue(streamEvent1.hashCode() == streamEvent.hashCode());
+
+ StreamEventBuilder streamEventBuilder1 = new StreamEventBuilder();
+ streamEvent1 = streamEventBuilder1.schema(streamDefinition).attributes(mapdata, streamDefinition).streamId("streamId").metaVersion("metaVersion").timestamep(1478667686971l).build();
+
+ Assert.assertFalse(streamEvent1 == streamEvent);
+ Assert.assertTrue(streamEvent1.equals(streamEvent));
+ Assert.assertTrue(streamEvent1.hashCode() == streamEvent.hashCode());
+ }
+
+ @Test
+ public void testStreamEventBuilder1() {
+ thrown.expect(IllegalArgumentException.class);
+ StreamEventBuilder streamEventBuilder = new StreamEventBuilder();
+ streamEventBuilder.metaVersion("metaVersion").timestamep(1478667686971l).build();
+ }
+
+ @Test
+ public void testStreamEventBuilder2() {
+ StreamEventBuilder streamEventBuilder = new StreamEventBuilder();
+
+ Map<String, Object> mapdata = new HashMap<>();
+ mapdata.put("name", "namevalue");
+ mapdata.put("host", "hostvalue");
+ mapdata.put("flag", "1");
+ mapdata.put("value", 10.0);
+ mapdata.put("data", 1);
+ mapdata.put("salary", -0.2);
+ mapdata.put("object", "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}");
+ mapdata.put("int", 4);
+
+ List<StreamColumn> streamColumns = new ArrayList<>();
+ streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setColumns(streamColumns);
+
+ StreamEvent streamEvent = streamEventBuilder.attributes(mapdata, streamDefinition).streamId("streamId").metaVersion("metaVersion").timestamep(1478667686971l).build();
+ Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,hostvalue,4],metaVersion=metaVersion]", streamEvent.toString());
+ }
+
+ @Test
+ public void testStreamEventBuilder3() {
+ StreamEventBuilder streamEventBuilder = new StreamEventBuilder();
+
+ Map<String, Object> mapdata = new HashMap<>();
+ mapdata.put("name", "namevalue");
+ mapdata.put("host", "hostvalue");
+ mapdata.put("flag", "1");
+ mapdata.put("value", 10.0);
+ mapdata.put("data", 1);
+ mapdata.put("salary", -0.2);
+ mapdata.put("object", "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}");
+
+ List<StreamColumn> streamColumns = new ArrayList<>();
+ streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+ streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+ streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+ streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+ streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+ StreamColumn streamColumn = new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build();
+ streamColumn.setDefaultValue(100);
+ streamColumns.add(streamColumn);
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setColumns(streamColumns);
+
+ StreamEvent streamEvent = streamEventBuilder.attributes(mapdata, streamDefinition).streamId("streamId").metaVersion("metaVersion").timestamep(1478667686971l).build();
+ Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,hostvalue,1,10.0,1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},100],metaVersion=metaVersion]", streamEvent.toString());
+ }
+
+ @Test
+ public void testStreamEventBuilder4() {
+ StreamEventBuilder streamEventBuilder = new StreamEventBuilder();
+
+ Map<String, Object> mapdata = new HashMap<>();
+ mapdata.put("name", "namevalue");
+ mapdata.put("host1", "hostvalue");
+ mapdata.put("flag", "1");
+
+ List<StreamColumn> streamColumns = new ArrayList<>();
+ streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+ streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setColumns(streamColumns);
+
+ StreamEvent streamEvent = streamEventBuilder.attributes(mapdata, streamDefinition).streamId("streamId").metaVersion("metaVersion").timestamep(1478667686971l).build();
+ Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,,1,],metaVersion=metaVersion]", streamEvent.toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java
new file mode 100644
index 0000000..6543a8d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+public class StreamEventTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testStreamEvent() {
+ Object[] data = new Object[]{"namevalue", "hostvalue", "1", 10.0, 1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", 4};
+ StreamEvent streamEvent = new StreamEvent("streamId", 1478667686971l, data);
+
+ Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,hostvalue,1,10.0,1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},4],metaVersion=null]", streamEvent.toString());
+
+ streamEvent = new StreamEvent("streamId", 1478667686971l, data, "metaVersion");
+
+ Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,hostvalue,1,10.0,1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},4],metaVersion=metaVersion]", streamEvent.toString());
+ StreamEvent streamEventCopy = streamEvent.copy();
+ Assert.assertFalse(streamEventCopy == streamEvent);
+ Assert.assertTrue(streamEventCopy.equals(streamEvent));
+ Assert.assertTrue(streamEventCopy.hashCode() == streamEvent.hashCode());
+
+ streamEventCopy.setMetaVersion("");
+ Assert.assertFalse(streamEventCopy == streamEvent);
+ Assert.assertFalse(streamEventCopy.equals(streamEvent));
+ Assert.assertFalse(streamEventCopy.hashCode() == streamEvent.hashCode());
+
+ streamEventCopy.copyFrom(streamEvent);
+
+ Assert.assertFalse(streamEventCopy == streamEvent);
+ Assert.assertTrue(streamEventCopy.equals(streamEvent));
+ Assert.assertTrue(streamEventCopy.hashCode() == streamEvent.hashCode());
+
+
+ List<StreamColumn> streamColumns = new ArrayList<>();
+ streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+ streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+ streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+ streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+ streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+ streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setColumns(streamColumns);
+
+ Object[] values = streamEvent.getData(streamDefinition, "int", "salary", "flag", "object");
+ Assert.assertEquals(4, values[0]);
+ Assert.assertEquals(-0.2, values[1]);
+ Assert.assertEquals("1", values[2]);
+ Assert.assertEquals("{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", values[3]);
+
+ values = streamEvent.getData(streamDefinition, Arrays.asList("int", "data", "flag", "object"));
+ Assert.assertEquals(4, values[0]);
+ Assert.assertEquals(1, values[1]);
+ Assert.assertEquals("1", values[2]);
+ Assert.assertEquals("{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", values[3]);
+ }
+
+
+ @Test
+ public void testStreamEvent1() {
+ thrown.expect(IndexOutOfBoundsException.class);
+ List<StreamColumn> streamColumns = new ArrayList<>();
+ streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+ streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+ streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+ streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+ streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+ streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setColumns(streamColumns);
+ StreamEvent streamEvent = new StreamEvent();
+ streamEvent.setData(new Object[]{"namevalue", "hostvalue", "1", 10.0, 1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}\"", 4});
+ streamEvent.getData(streamDefinition, "salary", "isYhd");
+
+ }
+
+ @Test
+ public void testStreamEvent2() {
+ thrown.expect(IndexOutOfBoundsException.class);
+ List<StreamColumn> streamColumns = new ArrayList<>();
+ streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+ streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+ streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+ streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+ streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+ streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setColumns(streamColumns);
+ StreamEvent streamEvent = new StreamEvent();
+ streamEvent.setData(new Object[]{"namevalue", "hostvalue", "1", 10.0, 1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}\""});
+ streamEvent.getData(streamDefinition, "salary", "int");
+
+ }
+
+ @Test
+ public void testStreamEvent3() {
+ List<StreamColumn> streamColumns = new ArrayList<>();
+ streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+ streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+ streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+ streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+ streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+ streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+
+ StreamDefinition streamDefinition = new StreamDefinition();
+ streamDefinition.setColumns(streamColumns);
+ StreamEvent streamEvent = new StreamEvent();
+ streamEvent.setData(new Object[]{"namevalue", 1, "flag", 10.0, 0.1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}\"", 1});
+ Object[] values = streamEvent.getData(streamDefinition, "value", "host");
+ Assert.assertEquals(10.0, values[0]);
+ Assert.assertEquals(1, values[1]);
+ }
+
+ @Test
+ public void testStreamEventEqual() {
+ Long timestamp = System.currentTimeMillis();
+ StreamEvent event1 = mockSimpleStreamEvent(timestamp);
+ StreamEvent event2 = mockSimpleStreamEvent(timestamp);
+ StreamEvent event3 = event2.copy();
+ Assert.assertEquals(event1, event2);
+ Assert.assertEquals(event2, event3);
+ }
+
+ private static StreamEvent mockSimpleStreamEvent(Long timestamp) {
+ return StreamEvent.builder()
+ .schema(mockStreamDefinition("sampleStream_1"))
+ .streamId("sampleStream_1")
+ .timestamep(timestamp)
+ .attributes(new HashMap<String, Object>() {{
+ put("name", "cpu");
+ put("value", 60.0);
+ put("unknown", "unknown column value");
+ }}).build();
+ }
+
+ private static StreamDefinition mockStreamDefinition(String streamId) {
+ StreamDefinition sampleStreamDefinition = new StreamDefinition();
+ List<StreamColumn> streamColumns = new ArrayList<>();
+ streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+ streamColumns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
+ streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+
+ sampleStreamDefinition.setStreamId(streamId);
+ sampleStreamDefinition.setTimeseries(true);
+ sampleStreamDefinition.setValidate(true);
+ sampleStreamDefinition.setDescription("Schema for " + streamId);
+ sampleStreamDefinition.setColumns(streamColumns);
+ return sampleStreamDefinition;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/StreamEventTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/StreamEventTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/StreamEventTest.java
deleted file mode 100644
index e8ec1ad..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/StreamEventTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.model;
-
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-public class StreamEventTest {
- @Test
- public void testStreamEventEqual() {
- Long timestamp = System.currentTimeMillis();
- StreamEvent event1 = mockSimpleStreamEvent(timestamp);
- StreamEvent event2 = mockSimpleStreamEvent(timestamp);
- StreamEvent event3 = event2.copy();
- Assert.assertEquals(event1, event2);
- Assert.assertEquals(event2, event3);
- }
-
- private static StreamEvent mockSimpleStreamEvent(Long timestamp) {
- return StreamEvent.builder()
- .schema(mockStreamDefinition("sampleStream_1"))
- .streamId("sampleStream_1")
- .timestamep(timestamp)
- .attributes(new HashMap<String, Object>() {{
- put("name", "cpu");
- put("value", 60.0);
- put("unknown", "unknown column value");
- }}).build();
- }
-
- private static StreamDefinition mockStreamDefinition(String streamId) {
- StreamDefinition sampleStreamDefinition = new StreamDefinition();
- List<StreamColumn> streamColumns = new ArrayList<>();
- streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
- streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
- streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
- streamColumns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
- streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
-
- sampleStreamDefinition.setStreamId(streamId);
- sampleStreamDefinition.setTimeseries(true);
- sampleStreamDefinition.setValidate(true);
- sampleStreamDefinition.setDescription("Schema for " + streamId);
- sampleStreamDefinition.setColumns(streamColumns);
- return sampleStreamDefinition;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java
deleted file mode 100644
index 1cc063d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.model;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestPolicyDefinition {
-
- @Test
- public void testEqual() {
- PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
- PolicyDefinition policy1 = new PolicyDefinition();
- policy1.setName("policy1");
- policy1.setDefinition(definition);
-
- PolicyDefinition policy2 = new PolicyDefinition();
- policy2.setName("policy1");
- policy2.setPolicyStatus(PolicyDefinition.PolicyStatus.DISABLED);
- policy2.setDefinition(definition);
-
- PolicyDefinition policy3 = new PolicyDefinition();
- policy3.setName("policy1");
- policy3.setDefinition(definition);
-
- Assert.assertTrue(policy1.equals(policy3));
- Assert.assertFalse(policy1.equals(policy2));
- }
-}
[2/2] incubator-eagle git commit: [EAGLE-749] Add unit test for model.
Posted by ha...@apache.org.
[EAGLE-749] Add unit test for model.
- Add unit test for model which under alert-common moudle.
- Fix equals() hashcode() inconsistent for PartitionedEvent,StreamEvent,Publishment.
- Fix bug for StreamColumn BooleanType.
https://issues.apache.org/jira/browse/EAGLE-749
Author: r7raul1984 <ta...@yhd.com>
Closes #632 from r7raul1984/EAGLE-749.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/2b61cef5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/2b61cef5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/2b61cef5
Branch: refs/heads/master
Commit: 2b61cef585b02fb6f40d5c3de7f7c5c060926ecd
Parents: 1da8dc4
Author: r7raul1984 <ta...@yhd.com>
Authored: Wed Nov 9 21:55:51 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Nov 9 21:55:51 2016 +0800
----------------------------------------------------------------------
.../alert/engine/coordinator/Publishment.java | 21 +-
.../alert/engine/coordinator/StreamColumn.java | 2 +-
.../engine/coordinator/StreamSortSpec.java | 17 +-
.../alert/engine/model/PartitionedEvent.java | 16 +-
.../eagle/alert/engine/model/StreamEvent.java | 11 +-
.../eagle/alert/config/TestConfigBus.java | 5 +-
.../model/Kafka2TupleMetadataTest.java | 49 +++++
.../model/PolicyWorkerQueueTest.java | 63 ++++++
.../model/StreamRepartitionStrategyTest.java | 74 +++++++
.../model/StreamRouterSpecTest.java | 59 ++++++
.../model/Tuple2StreamMetadataTest.java | 50 +++++
.../alert/coordination/model/WorkSlotTest.java | 43 +++++
.../model/internal/MonitoredStreamTest.java | 69 +++++++
.../model/internal/PolicyAssignmentTest.java | 37 ++++
.../model/internal/StreamGroupTest.java | 67 +++++++
.../model/internal/StreamWorkSlotQueueTest.java | 61 ++++++
.../model/internal/TopologyTest.java | 47 +++++
.../OverrideDeduplicatorSpecTest.java | 61 ++++++
.../coordinator/PolicyDefinitionTest.java | 140 ++++++++++++++
.../engine/coordinator/PublishmentTest.java | 100 ++++++++++
.../engine/coordinator/PublishmentTypeTest.java | 47 +++++
.../engine/coordinator/StreamColumnTest.java | 153 +++++++++++++++
.../coordinator/StreamDefinitionTest.java | 52 +++++
.../engine/coordinator/StreamPartitionTest.java | 43 +++++
.../engine/coordinator/StreamSortSpecTest.java | 45 +++++
.../coordinator/StreamingClusterTest.java | 47 +++++
.../engine/model/AlertPublishEventTest.java | 110 +++++++++++
.../engine/model/AlertStreamEventTest.java | 63 ++++++
.../engine/model/PartitionedEventTest.java | 54 ++++++
.../engine/model/StreamEventBuilderTest.java | 166 ++++++++++++++++
.../alert/engine/model/StreamEventTest.java | 191 +++++++++++++++++++
.../eagle/alert/model/StreamEventTest.java | 68 -------
.../eagle/alert/model/TestPolicyDefinition.java | 45 -----
33 files changed, 1930 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
index dbb1844..d1cc33a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -135,28 +135,29 @@ public class Publishment {
if (obj instanceof Publishment) {
Publishment p = (Publishment) obj;
return (Objects.equals(name, p.getName()) && Objects.equals(type, p.getType())
- && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin())
- && Objects.equals(dedupFields, p.getDedupFields())
- && Objects.equals(dedupStateField, p.getDedupStateField())
- && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator())
- && Objects.equals(policyIds, p.getPolicyIds())
- && Objects.equals(streamIds, p.getStreamIds())
- && properties.equals(p.getProperties()));
+ && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin())
+ && Objects.equals(dedupFields, p.getDedupFields())
+ && Objects.equals(dedupStateField, p.getDedupStateField())
+ && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator())
+ && Objects.equals(policyIds, p.getPolicyIds())
+ && Objects.equals(streamIds, p.getStreamIds())
+ && properties.equals(p.getProperties()));
}
return false;
}
@Override
public int hashCode() {
- return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(policyIds)
- .append(properties).build();
+ return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(dedupFields)
+ .append(dedupStateField).append(overrideDeduplicator).append(policyIds).append(streamIds)
+ .append(properties).build();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Publishment[name:").append(name).append(",type:").append(type).append(",policyId:")
- .append(policyIds).append(",properties:").append(properties);
+ .append(policyIds).append(",properties:").append(properties);
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
index 5a5f2cc..4628043 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
@@ -85,7 +85,7 @@ public class StreamColumn implements Serializable {
this.setDefaultValue(Double.valueOf((String) this.getDefaultValue()));
break;
case BOOL:
- this.setDefaultValue(Double.valueOf((String) this.getDefaultValue()));
+ this.setDefaultValue(Boolean.valueOf((String) this.getDefaultValue()));
break;
case OBJECT:
try {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
index 65b9151..ff05fc8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
@@ -16,6 +16,7 @@
*/
package org.apache.eagle.alert.engine.coordinator;
+import org.apache.commons.lang.StringUtils;
import org.apache.eagle.alert.utils.TimePeriodUtils;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.commons.lang.builder.HashCodeBuilder;
@@ -45,7 +46,7 @@ public class StreamSortSpec implements Serializable {
}
public int getWindowPeriodMillis() {
- if (windowPeriod != null) {
+ if (StringUtils.isNotBlank(windowPeriod)) {
return TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(windowPeriod));
} else {
return 0;
@@ -76,9 +77,9 @@ public class StreamSortSpec implements Serializable {
@Override
public int hashCode() {
return new HashCodeBuilder()
- .append(windowPeriod)
- .append(windowMargin)
- .toHashCode();
+ .append(windowPeriod)
+ .append(windowMargin)
+ .toHashCode();
}
@Override
@@ -92,14 +93,14 @@ public class StreamSortSpec implements Serializable {
StreamSortSpec another = (StreamSortSpec) that;
return
- another.windowPeriod.equals(this.windowPeriod)
- && another.windowMargin == this.windowMargin;
+ another.windowPeriod.equals(this.windowPeriod)
+ && another.windowMargin == this.windowMargin;
}
@Override
public String toString() {
return String.format("StreamSortSpec[windowPeriod=%s,windowMargin=%d]",
- this.getWindowPeriod(),
- this.getWindowMargin());
+ this.getWindowPeriod(),
+ this.getWindowMargin());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
index 51e4532..ecca0ff 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
@@ -19,6 +19,7 @@ package org.apache.eagle.alert.engine.model;
import org.apache.eagle.alert.engine.coordinator.StreamPartition;
import backtype.storm.tuple.Tuple;
import org.apache.commons.lang3.builder.HashCodeBuilder;
+
import java.io.Serializable;
import java.util.Objects;
@@ -63,9 +64,9 @@ public class PartitionedEvent implements Serializable {
if (obj instanceof PartitionedEvent) {
PartitionedEvent another = (PartitionedEvent) obj;
return !(this.partitionKey != another.getPartitionKey()
- || !Objects.equals(this.event, another.getEvent())
- || !Objects.equals(this.partition, another.getPartition())
- || !Objects.equals(this.anchor, another.anchor));
+ || !Objects.equals(this.event, another.getEvent())
+ || !Objects.equals(this.partition, another.getPartition())
+ || !Objects.equals(this.anchor, another.anchor));
} else {
return false;
}
@@ -74,10 +75,11 @@ public class PartitionedEvent implements Serializable {
@Override
public int hashCode() {
return new HashCodeBuilder()
- .append(partitionKey)
- .append(event)
- .append(partition)
- .build();
+ .append(partitionKey)
+ .append(event)
+ .append(partition)
+ .append(anchor)
+ .build();
}
public StreamEvent getEvent() {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
index 8480bc5..130985f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
@@ -95,7 +95,8 @@ public class StreamEvent implements Serializable {
}
if (obj instanceof StreamEvent) {
StreamEvent another = (StreamEvent) obj;
- return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp && Arrays.deepEquals(this.data, another.data);
+ return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp
+ && Arrays.deepEquals(this.data, another.data) && Objects.equals(this.metaVersion, another.metaVersion);
}
return false;
}
@@ -113,10 +114,10 @@ public class StreamEvent implements Serializable {
}
}
return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s],metaVersion=%s]",
- this.getStreamId(),
- DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
- StringUtils.join(dataStrings, ","),
- this.getMetaVersion());
+ this.getStreamId(),
+ DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
+ StringUtils.join(dataStrings, ","),
+ this.getMetaVersion());
}
public static StreamEventBuilder builder() {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
index 5c3f35e..e37e9be 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
@@ -16,6 +16,7 @@
*/
package org.apache.eagle.alert.config;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -69,8 +70,8 @@ public class TestConfigBus {
}
@After
- public void shutdown() {
- CloseableUtils.closeQuietly(server);
+ public void shutdown() throws IOException {
+ server.stop();
producer.close();
consumer.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java
new file mode 100644
index 0000000..a252fae
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class Kafka2TupleMetadataTest {
+ @Test
+ public void testKafka2TupleMetadata() {
+ Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata();
+ kafka2TupleMetadata.setName("setName");
+ kafka2TupleMetadata.setCodec(new Tuple2StreamMetadata());
+ kafka2TupleMetadata.setType("setType");
+ kafka2TupleMetadata.setTopic("setTopic");
+ kafka2TupleMetadata.setSchemeCls("org.apache.eagle.alert.engine.scheme.PlainStringScheme");
+
+ Kafka2TupleMetadata kafka2TupleMetadata1 = new Kafka2TupleMetadata();
+ kafka2TupleMetadata1.setName("setName");
+ kafka2TupleMetadata1.setCodec(new Tuple2StreamMetadata());
+ kafka2TupleMetadata1.setType("setType");
+ kafka2TupleMetadata1.setTopic("setTopic");
+ kafka2TupleMetadata1.setSchemeCls("org.apache.eagle.alert.engine.scheme.PlainStringScheme");
+
+ Assert.assertFalse(kafka2TupleMetadata1 == kafka2TupleMetadata);
+ Assert.assertTrue(kafka2TupleMetadata1.equals(kafka2TupleMetadata));
+ Assert.assertTrue(kafka2TupleMetadata1.hashCode() == kafka2TupleMetadata.hashCode());
+
+ kafka2TupleMetadata1.setType("setType1");
+
+ Assert.assertFalse(kafka2TupleMetadata1.equals(kafka2TupleMetadata));
+ Assert.assertFalse(kafka2TupleMetadata1.hashCode() == kafka2TupleMetadata.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java
new file mode 100644
index 0000000..71f3188
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PolicyWorkerQueueTest {
+ @Test
+ public void testPolicyWorkerQueue() {
+
+ List<WorkSlot> workers = new ArrayList<>();
+ WorkSlot workSlot1 = new WorkSlot("setTopologyName1", "setBoltId1");
+ WorkSlot workSlot2 = new WorkSlot("setTopologyName1", "setBoltId2");
+ workers.add(workSlot1);
+ workers.add(workSlot2);
+ PolicyWorkerQueue policyWorkerQueue = new PolicyWorkerQueue(workers);
+ Assert.assertEquals(null, policyWorkerQueue.getPartition());
+ Assert.assertEquals(workSlot1, policyWorkerQueue.getWorkers().get(0));
+ Assert.assertEquals(workSlot2, policyWorkerQueue.getWorkers().get(1));
+ Assert.assertEquals("[(setTopologyName1:setBoltId1),(setTopologyName1:setBoltId2)]", policyWorkerQueue.toString());
+
+ PolicyWorkerQueue policyWorkerQueue1 = new PolicyWorkerQueue();
+ policyWorkerQueue1.setWorkers(workers);
+
+ Assert.assertTrue(policyWorkerQueue.equals(policyWorkerQueue1));
+ Assert.assertTrue(policyWorkerQueue.hashCode() == policyWorkerQueue1.hashCode());
+
+ StreamSortSpec streamSortSpec = new StreamSortSpec();
+ streamSortSpec.setWindowPeriod("PT10S");
+ StreamPartition streamPartition = new StreamPartition();
+ List<String> columns = new ArrayList<>();
+ columns.add("jobId");
+ streamPartition.setColumns(columns);
+ streamPartition.setSortSpec(streamSortSpec);
+ streamPartition.setStreamId("test");
+ streamPartition.setType(StreamPartition.Type.GROUPBY);
+ policyWorkerQueue1.setPartition(streamPartition);
+
+ Assert.assertFalse(policyWorkerQueue.equals(policyWorkerQueue1));
+ Assert.assertFalse(policyWorkerQueue.hashCode() == policyWorkerQueue1.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java
new file mode 100644
index 0000000..c416a49
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamRepartitionStrategyTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testStreamRepartitionStrategy() {
+ thrown.expect(NullPointerException.class);
+ StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
+ streamRepartitionStrategy.hashCode();
+ }
+
+ @Test
+ public void testStreamRepartitionStrategy1() {
+ thrown.expect(NullPointerException.class);
+ StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
+ streamRepartitionStrategy.equals(streamRepartitionStrategy);
+ }
+
+ @Test
+ public void testStreamRepartitionStrategy2() {
+
+ StreamSortSpec streamSortSpec = new StreamSortSpec();
+ streamSortSpec.setWindowPeriod("PT10S");
+ StreamPartition streamPartition = new StreamPartition();
+ List<String> columns = new ArrayList<>();
+ columns.add("jobId");
+ streamPartition.setColumns(columns);
+ streamPartition.setSortSpec(streamSortSpec);
+ streamPartition.setStreamId("test");
+ streamPartition.setType(StreamPartition.Type.GROUPBY);
+
+
+ StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
+ Assert.assertEquals(null, streamRepartitionStrategy.getPartition());
+ Assert.assertEquals(0, streamRepartitionStrategy.getNumTotalParticipatingRouterBolts());
+ Assert.assertEquals(0, streamRepartitionStrategy.getStartSequence());
+ streamRepartitionStrategy.setPartition(streamPartition);
+ StreamRepartitionStrategy streamRepartitionStrategy1 = new StreamRepartitionStrategy();
+ streamRepartitionStrategy1.setPartition(streamPartition);
+
+ Assert.assertTrue(streamRepartitionStrategy.equals(streamRepartitionStrategy1));
+ Assert.assertTrue(streamRepartitionStrategy.hashCode() == streamRepartitionStrategy1.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java
new file mode 100644
index 0000000..88e72cb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamRouterSpecTest {
+ @Test
+ public void testStreamRouterSpec() {
+ StreamRouterSpec streamRouterSpec = new StreamRouterSpec();
+ Assert.assertEquals(null, streamRouterSpec.getPartition());
+ Assert.assertEquals(null, streamRouterSpec.getStreamId());
+ Assert.assertTrue(streamRouterSpec.getTargetQueue().isEmpty());
+
+ List<WorkSlot> workers = new ArrayList<>();
+ WorkSlot workSlot1 = new WorkSlot("setTopologyName1", "setBoltId1");
+ WorkSlot workSlot2 = new WorkSlot("setTopologyName1", "setBoltId2");
+ workers.add(workSlot1);
+ workers.add(workSlot2);
+ PolicyWorkerQueue policyWorkerQueue = new PolicyWorkerQueue(workers);
+ streamRouterSpec.addQueue(policyWorkerQueue);
+ streamRouterSpec.setStreamId("streamRouterSpec");
+
+ Assert.assertEquals("streamRouterSpec", streamRouterSpec.getStreamId());
+ Assert.assertEquals(1, streamRouterSpec.getTargetQueue().size());
+ Assert.assertEquals(2, streamRouterSpec.getTargetQueue().get(0).getWorkers().size());
+
+ StreamRouterSpec streamRouterSpec1 = new StreamRouterSpec();
+ streamRouterSpec1.addQueue(policyWorkerQueue);
+ streamRouterSpec1.setStreamId("streamRouterSpec1");
+
+ Assert.assertFalse(streamRouterSpec.equals(streamRouterSpec1));
+
+ streamRouterSpec1.setStreamId("streamRouterSpec");
+
+ Assert.assertTrue(streamRouterSpec.equals(streamRouterSpec1));
+ Assert.assertTrue(streamRouterSpec.hashCode() == streamRouterSpec1.hashCode());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java
new file mode 100644
index 0000000..8bbfc41
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class Tuple2StreamMetadataTest {
+ @Test
+ public void testTuple2StreamMetadata() {
+ Tuple2StreamMetadata metadata = new Tuple2StreamMetadata();
+ Set activeStreamNames = new HashSet<>();
+ activeStreamNames.add("defaultStringStream");
+ metadata.setStreamNameSelectorCls("org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector");
+ metadata.setStreamNameSelectorProp(new Properties());
+ metadata.getStreamNameSelectorProp().put("userProvidedStreamName", "defaultStringStream");
+ metadata.setActiveStreamNames(activeStreamNames);
+ metadata.setTimestampColumn("timestamp");
+
+ Tuple2StreamMetadata metadata1 = new Tuple2StreamMetadata();
+ Set activeStreamNames1 = new HashSet<>();
+ activeStreamNames1.add("defaultStringStream");
+ metadata1.setStreamNameSelectorCls("org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector");
+ metadata1.setStreamNameSelectorProp(new Properties());
+ metadata1.getStreamNameSelectorProp().put("userProvidedStreamName", "defaultStringStream");
+ metadata1.setActiveStreamNames(activeStreamNames1);
+ metadata1.setTimestampColumn("timestamp");
+
+ Assert.assertFalse(metadata == metadata1);
+ Assert.assertFalse(metadata.equals(metadata1));
+ Assert.assertFalse(metadata.hashCode() == metadata1.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java
new file mode 100644
index 0000000..48ee73b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class WorkSlotTest {
+ @Test
+ public void testWorkSlot() {
+ WorkSlot workSlot = new WorkSlot();
+ Assert.assertEquals("(null:null)", workSlot.toString());
+ Assert.assertEquals(null, workSlot.getBoltId());
+ Assert.assertEquals(null, workSlot.getTopologyName());
+ workSlot.setBoltId("setBoltId");
+ workSlot.setTopologyName("setTopologyName");
+ Assert.assertEquals("(setTopologyName:setBoltId)", workSlot.toString());
+ Assert.assertEquals("setBoltId", workSlot.getBoltId());
+ Assert.assertEquals("setTopologyName", workSlot.getTopologyName());
+
+ WorkSlot workSlot1 = new WorkSlot("setTopologyName", "setBoltId");
+ Assert.assertEquals("(setTopologyName:setBoltId)", workSlot1.toString());
+ Assert.assertEquals("setBoltId", workSlot1.getBoltId());
+ Assert.assertEquals("setTopologyName", workSlot1.getTopologyName());
+ Assert.assertTrue(workSlot1.equals(workSlot));
+ Assert.assertTrue(workSlot1.hashCode() == workSlot.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java
new file mode 100644
index 0000000..a2c0d6e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class MonitoredStreamTest {
+
+ @Test
+ public void testMonitoredStream() {
+
+ StreamGroup streamGroup = new StreamGroup();
+ StreamSortSpec streamSortSpec = new StreamSortSpec();
+ streamSortSpec.setWindowPeriod("PT10S");
+ StreamPartition streamPartition = new StreamPartition();
+ List<String> columns = new ArrayList<>();
+ columns.add("jobId");
+ streamPartition.setColumns(columns);
+ streamPartition.setSortSpec(streamSortSpec);
+ streamPartition.setStreamId("test");
+ streamPartition.setType(StreamPartition.Type.GROUPBY);
+ streamGroup.addStreamPartition(streamPartition);
+ WorkSlot workSlot = new WorkSlot("setTopologyName", "setBoltId");
+ List<WorkSlot> workSlots = new ArrayList<>();
+ workSlots.add(workSlot);
+ StreamWorkSlotQueue streamWorkSlotQueue = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), workSlots);
+
+ MonitoredStream monitoredStream = new MonitoredStream(streamGroup);
+ Assert.assertEquals(null, monitoredStream.getVersion());
+ Assert.assertTrue(monitoredStream.getQueues().isEmpty());
+ Assert.assertEquals(streamGroup, monitoredStream.getStreamGroup());
+ monitoredStream.addQueues(streamWorkSlotQueue);
+ Assert.assertEquals(streamWorkSlotQueue, monitoredStream.getQueues().get(0));
+
+ MonitoredStream monitoredStream1 = new MonitoredStream(streamGroup);
+ Assert.assertTrue(monitoredStream.equals(monitoredStream1));
+ Assert.assertTrue(monitoredStream.hashCode() == monitoredStream1.hashCode());
+
+ monitoredStream.removeQueue(streamWorkSlotQueue);
+ Assert.assertTrue(monitoredStream.getQueues().isEmpty());
+
+ Assert.assertTrue(monitoredStream.equals(monitoredStream1));
+ Assert.assertTrue(monitoredStream.hashCode() == monitoredStream1.hashCode());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java
new file mode 100644
index 0000000..1491c77
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PolicyAssignmentTest {
+ @Test
+ public void testPolicyAssignment() {
+ PolicyAssignment policyAssignment = new PolicyAssignment("policy", "queue");
+ Assert.assertEquals("policy", policyAssignment.getPolicyName());
+ Assert.assertEquals("queue", policyAssignment.getQueueId());
+ Assert.assertEquals(null, policyAssignment.getVersion());
+ Assert.assertEquals("PolicyAssignment of policy policy, queueId queue, version null !", policyAssignment.toString());
+
+ Assert.assertFalse(policyAssignment.equals(new PolicyAssignment("policy", "queue")));
+ Assert.assertFalse(policyAssignment == new PolicyAssignment("policy", "queue"));
+ Assert.assertFalse(policyAssignment.hashCode() == new PolicyAssignment("policy", "queue").hashCode());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
new file mode 100644
index 0000000..d0f0189
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamGroupTest {
+ @Test
+ public void testStreamGroup() {
+ StreamGroup streamGroup = new StreamGroup();
+ Assert.assertEquals("StreamGroup partitions=: [] ", streamGroup.toString());
+ Assert.assertEquals("SG[]", streamGroup.getStreamId());
+
+ StreamSortSpec streamSortSpec = new StreamSortSpec();
+ streamSortSpec.setWindowPeriod("PT10S");
+ StreamPartition streamPartition = new StreamPartition();
+ List<String> columns = new ArrayList<>();
+ columns.add("jobId");
+ streamPartition.setColumns(columns);
+ streamPartition.setSortSpec(streamSortSpec);
+ streamPartition.setStreamId("test");
+ streamPartition.setType(StreamPartition.Type.GROUPBY);
+ streamGroup.addStreamPartition(streamPartition);
+ Assert.assertEquals("SG[test-]", streamGroup.getStreamId());
+ Assert.assertEquals("StreamGroup partitions=: [StreamPartition[streamId=test,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]]] ", streamGroup.toString());
+
+ List<StreamPartition> streamPartitions = new ArrayList<>();
+ streamPartition.setStreamId("test1");
+ streamPartitions.add(streamPartition);
+ streamGroup.addStreamPartitions(streamPartitions);
+ Assert.assertEquals("SG[test1-test1-]", streamGroup.getStreamId());
+
+
+ streamPartitions = new ArrayList<>();
+ StreamPartition streamPartition1 = new StreamPartition();
+ streamPartition1.setStreamId("test2");
+ streamPartitions.add(streamPartition1);
+ streamGroup.addStreamPartitions(streamPartitions);
+ Assert.assertEquals("SG[test1-test1-test2-]", streamGroup.getStreamId());
+ Assert.assertEquals("StreamGroup partitions=: [StreamPartition[streamId=test1,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]], StreamPartition[streamId=test1,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]], StreamPartition[streamId=test2,type=null,columns=[],sortSpec=[null]]] ", streamGroup.toString());
+
+ StreamGroup streamGroup1 = new StreamGroup();
+ streamGroup1.addStreamPartitions(streamGroup.getStreamPartitions());
+ Assert.assertTrue(streamGroup.equals(streamGroup1));
+ Assert.assertTrue(streamGroup.hashCode() == streamGroup1.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java
new file mode 100644
index 0000000..bc2f74e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class StreamWorkSlotQueueTest {
+ @Test
+ public void testStreamWorkSlotQueue() {
+ StreamGroup streamGroup = new StreamGroup();
+ StreamSortSpec streamSortSpec = new StreamSortSpec();
+ streamSortSpec.setWindowPeriod("PT10S");
+ StreamPartition streamPartition = new StreamPartition();
+ List<String> columns = new ArrayList<>();
+ columns.add("jobId");
+ streamPartition.setColumns(columns);
+ streamPartition.setSortSpec(streamSortSpec);
+ streamPartition.setStreamId("test");
+ streamPartition.setType(StreamPartition.Type.GROUPBY);
+ streamGroup.addStreamPartition(streamPartition);
+ WorkSlot workSlot = new WorkSlot("setTopologyName", "setBoltId");
+ List<WorkSlot> workSlots = new ArrayList<>();
+ workSlots.add(workSlot);
+ StreamWorkSlotQueue streamWorkSlotQueue = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), workSlots);
+
+ Assert.assertTrue(streamWorkSlotQueue.getQueueId().startsWith("SG[test-]"));
+ Assert.assertTrue(streamWorkSlotQueue.getDedicateOption().isEmpty());
+ Assert.assertEquals(0, streamWorkSlotQueue.getNumberOfGroupBolts());
+ Assert.assertEquals(1, streamWorkSlotQueue.getQueueSize());
+ Assert.assertTrue(streamWorkSlotQueue.getTopoGroupStartIndex().isEmpty());
+ Assert.assertEquals(-1, streamWorkSlotQueue.getTopologyGroupStartIndex(""));
+ Assert.assertEquals(workSlot, streamWorkSlotQueue.getWorkingSlots().get(0));
+
+ StreamWorkSlotQueue streamWorkSlotQueue1 = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), workSlots);
+ Assert.assertFalse(streamWorkSlotQueue.equals(streamWorkSlotQueue1));
+ Assert.assertFalse(streamWorkSlotQueue == streamWorkSlotQueue1);
+ Assert.assertFalse(streamWorkSlotQueue.hashCode() == streamWorkSlotQueue1.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java
new file mode 100644
index 0000000..760657a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TopologyTest {
+ @Test
+ public void testTopology() {
+ Topology topology = new Topology("test", 2, 3);
+ Assert.assertEquals(null, topology.getClusterName());
+ Assert.assertEquals("test", topology.getName());
+ Assert.assertEquals(null, topology.getPubBoltId());
+ Assert.assertEquals(null, topology.getSpoutId());
+ Assert.assertEquals(0, topology.getAlertBoltIds().size());
+ Assert.assertEquals(1, topology.getAlertParallelism());
+ Assert.assertEquals(0, topology.getGroupNodeIds().size());
+ Assert.assertEquals(1, topology.getGroupParallelism());
+ Assert.assertEquals(3, topology.getNumOfAlertBolt());
+ Assert.assertEquals(2, topology.getNumOfGroupBolt());
+ Assert.assertEquals(0, topology.getNumOfPublishBolt());
+ Assert.assertEquals(1, topology.getNumOfSpout());
+ Assert.assertEquals(1, topology.getSpoutParallelism());
+
+ Topology topology1 = new Topology("test", 2, 3);
+
+ Assert.assertFalse(topology1.equals(topology));
+ Assert.assertFalse(topology1.hashCode() == topology.hashCode());
+ Assert.assertFalse(topology1 == topology);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java
new file mode 100644
index 0000000..cc84c56
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class OverrideDeduplicatorSpecTest {
+
+ @Test
+ public void testOverrideDeduplicatorSpec() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("kafka_broker", "localhost:9092");
+ properties.put("topic", "TEST_TOPIC_NAME");
+ OverrideDeduplicatorSpec overrideDeduplicatorSpec = new OverrideDeduplicatorSpec();
+ overrideDeduplicatorSpec.setClassName("testClass");
+ overrideDeduplicatorSpec.setProperties(properties);
+
+ OverrideDeduplicatorSpec overrideDeduplicatorSpec1 = new OverrideDeduplicatorSpec();
+ overrideDeduplicatorSpec1.setClassName("testClass");
+ overrideDeduplicatorSpec1.setProperties(properties);
+
+ Assert.assertFalse(overrideDeduplicatorSpec1 == overrideDeduplicatorSpec);
+ Assert.assertTrue(overrideDeduplicatorSpec1.equals(overrideDeduplicatorSpec));
+ Assert.assertTrue(overrideDeduplicatorSpec1.hashCode() == overrideDeduplicatorSpec.hashCode());
+
+ overrideDeduplicatorSpec1.setClassName("testClass1");
+
+ Assert.assertFalse(overrideDeduplicatorSpec1 == overrideDeduplicatorSpec);
+ Assert.assertFalse(overrideDeduplicatorSpec1.equals(overrideDeduplicatorSpec));
+ Assert.assertFalse(overrideDeduplicatorSpec1.hashCode() == overrideDeduplicatorSpec.hashCode());
+
+ overrideDeduplicatorSpec1.setClassName("testClass");
+ Map<String, String> properties1 = new HashMap<>();
+ properties.put("kafka_broker", "localhost:9092");
+ overrideDeduplicatorSpec1.setProperties(properties1);
+
+ Assert.assertFalse(overrideDeduplicatorSpec1 == overrideDeduplicatorSpec);
+ Assert.assertFalse(overrideDeduplicatorSpec1.equals(overrideDeduplicatorSpec));
+ Assert.assertFalse(overrideDeduplicatorSpec1.hashCode() == overrideDeduplicatorSpec.hashCode());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
new file mode 100644
index 0000000..7acb4f7
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+public class PolicyDefinitionTest {
+
+ @Test
+ public void testPolicyInnerDefinition() {
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ def.setValue("test");
+ def.setType("siddhi");
+ def.setHandlerClass("setHandlerClass");
+ def.setProperties(new HashMap<>());
+ def.setOutputStreams(Arrays.asList("outputStream"));
+ def.setInputStreams(Arrays.asList("inputStream"));
+ Assert.assertEquals("{type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }", def.toString());
+
+ PolicyDefinition.Definition def1 = new PolicyDefinition.Definition();
+ def1.setValue("test");
+ def1.setType("siddhi");
+ def1.setHandlerClass("setHandlerClass");
+ def1.setProperties(new HashMap<>());
+ def1.setOutputStreams(Arrays.asList("outputStream"));
+ def1.setInputStreams(Arrays.asList("inputStream"));
+
+ Assert.assertFalse(def == def1);
+ Assert.assertTrue(def.equals(def1));
+ Assert.assertTrue(def.hashCode() == def1.hashCode());
+
+ def1.setInputStreams(Arrays.asList("inputStream1"));
+
+ Assert.assertFalse(def.equals(def1));
+ Assert.assertTrue(def.hashCode() == def1.hashCode());//problem equals() and hashCode() be inconsistent
+
+ }
+
+ @Test
+ public void testPolicyDefinition() {
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ def.setValue("test");
+ def.setType("siddhi");
+ def.setHandlerClass("setHandlerClass");
+ def.setProperties(new HashMap<>());
+ def.setOutputStreams(Arrays.asList("outputStream"));
+ def.setInputStreams(Arrays.asList("inputStream"));
+ pd.setDefinition(def);
+ pd.setInputStreams(Arrays.asList("inputStream"));//confuse with PolicyDefinition.Definition InputStreams
+ pd.setOutputStreams(Arrays.asList("outputStream"));//confuse with PolicyDefinition.Definition OutputStreams
+ pd.setName("policyName");
+ pd.setDescription(String.format("Test policy for stream %s", "streamName"));
+
+ StreamPartition sp = new StreamPartition();
+ sp.setStreamId("streamName");
+ sp.setColumns(Arrays.asList("host"));
+ sp.setType(StreamPartition.Type.GROUPBY);
+ pd.addPartition(sp);
+ Assert.assertEquals("{name=\"policyName\",definition={type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }}", pd.toString());
+
+ PolicyDefinition pd1 = new PolicyDefinition();
+ PolicyDefinition.Definition def1 = new PolicyDefinition.Definition();
+ def1.setValue("test");
+ def1.setType("siddhi");
+ def1.setHandlerClass("setHandlerClass");
+ def1.setProperties(new HashMap<>());
+ def1.setOutputStreams(Arrays.asList("outputStream"));
+ def1.setInputStreams(Arrays.asList("inputStream"));
+ pd1.setDefinition(def1);
+ pd1.setInputStreams(Arrays.asList("inputStream"));//confuse with PolicyDefinition.Definition InputStreams
+ pd1.setOutputStreams(Arrays.asList("outputStream"));//confuse with PolicyDefinition.Definition OutputStreams
+ pd1.setName("policyName");
+ pd1.setDescription(String.format("Test policy for stream %s", "streamName"));
+
+ StreamPartition sp1 = new StreamPartition();
+ sp1.setStreamId("streamName");
+ sp1.setColumns(Arrays.asList("host"));
+ sp1.setType(StreamPartition.Type.GROUPBY);
+ pd1.addPartition(sp1);
+
+
+ Assert.assertFalse(pd == pd1);
+ Assert.assertTrue(pd.equals(pd1));
+ Assert.assertTrue(pd.hashCode() == pd1.hashCode());
+ sp1.setStreamId("streamName1");
+
+ Assert.assertFalse(pd == pd1);
+ Assert.assertFalse(pd.equals(pd1));
+ Assert.assertFalse(pd.hashCode() == pd1.hashCode());
+
+ sp1.setStreamId("streamName");
+ def1.setOutputStreams(Arrays.asList("outputStream1"));
+
+ Assert.assertFalse(pd == pd1);
+ Assert.assertFalse(pd.equals(pd1));
+
+ Assert.assertTrue(pd.hashCode() == pd1.hashCode());//problem equals() and hashCode() be inconsistent
+
+ }
+
+ @Test
+ public void testPolicyDefinitionEqualByPolicyStatus() {
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+ PolicyDefinition policy1 = new PolicyDefinition();
+ policy1.setName("policy1");
+ policy1.setDefinition(definition);
+
+ PolicyDefinition policy2 = new PolicyDefinition();
+ policy2.setName("policy1");
+ policy2.setPolicyStatus(PolicyDefinition.PolicyStatus.DISABLED);
+ policy2.setDefinition(definition);
+
+ PolicyDefinition policy3 = new PolicyDefinition();
+ policy3.setName("policy1");
+ policy3.setDefinition(definition);
+
+ Assert.assertTrue(policy1.equals(policy3));
+ Assert.assertFalse(policy1.equals(policy2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java
new file mode 100644
index 0000000..494d8ca
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class PublishmentTest {
+ @Test
+ public void testPublishment() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("kafka_broker", "localhost:9092");
+ properties.put("topic", "TEST_TOPIC_NAME");
+
+ List<Map<String, Object>> kafkaClientConfig = new ArrayList<>();
+ kafkaClientConfig.add(ImmutableMap.of("name", "producer.type", "value", "sync"));
+ properties.put("kafka_client_config", kafkaClientConfig);
+
+ PolicyDefinition policy = createPolicy("testStream", "testPolicy");
+ Publishment publishment = new Publishment();
+ publishment.setName("testAsyncPublishment");
+ publishment.setType("org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher");
+ publishment.setPolicyIds(Arrays.asList(policy.getName()));
+ publishment.setDedupIntervalMin("PT0M");
+ OverrideDeduplicatorSpec overrideDeduplicatorSpec = new OverrideDeduplicatorSpec();
+ overrideDeduplicatorSpec.setClassName("testClass");
+ publishment.setOverrideDeduplicator(overrideDeduplicatorSpec);
+ publishment.setSerializer("org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer");
+ publishment.setProperties(properties);
+
+ Assert.assertEquals("Publishment[name:testAsyncPublishment,type:org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher,policyId:[testPolicy],properties:{kafka_client_config=[{name=producer.type, value=sync}], topic=TEST_TOPIC_NAME, kafka_broker=localhost:9092}", publishment.toString());
+
+
+ Publishment publishment1 = new Publishment();
+ publishment1.setName("testAsyncPublishment");
+ publishment1.setType("org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher");
+ publishment1.setPolicyIds(Arrays.asList(policy.getName()));
+ publishment1.setDedupIntervalMin("PT0M");
+ OverrideDeduplicatorSpec overrideDeduplicatorSpec1 = new OverrideDeduplicatorSpec();
+ overrideDeduplicatorSpec1.setClassName("testClass");
+ publishment1.setOverrideDeduplicator(overrideDeduplicatorSpec1);
+ publishment1.setSerializer("org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer");
+ publishment1.setProperties(properties);
+
+ Assert.assertTrue(publishment.equals(publishment1));
+ Assert.assertTrue(publishment.hashCode() == publishment1.hashCode());
+ Assert.assertFalse(publishment == publishment1);
+ publishment1.getOverrideDeduplicator().setClassName("testClass1");
+
+
+ Assert.assertFalse(publishment.equals(publishment1));
+ Assert.assertFalse(publishment.hashCode() == publishment1.hashCode());
+ Assert.assertFalse(publishment == publishment1);
+
+ publishment1.getOverrideDeduplicator().setClassName("testClass");
+ publishment1.setStreamIds(Arrays.asList("streamid1,streamid2"));
+ Assert.assertFalse(publishment.equals(publishment1));
+ Assert.assertFalse(publishment.hashCode() == publishment1.hashCode());
+ Assert.assertFalse(publishment == publishment1);
+ }
+
+ private PolicyDefinition createPolicy(String streamName, String policyName) {
+ PolicyDefinition pd = new PolicyDefinition();
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ // expression, something like "PT5S,dynamic,1,host"
+ def.setValue("test");
+ def.setType("siddhi");
+ pd.setDefinition(def);
+ pd.setInputStreams(Arrays.asList("inputStream"));
+ pd.setOutputStreams(Arrays.asList("outputStream"));
+ pd.setName(policyName);
+ pd.setDescription(String.format("Test policy for stream %s", streamName));
+
+ StreamPartition sp = new StreamPartition();
+ sp.setStreamId(streamName);
+ sp.setColumns(Arrays.asList("host"));
+ sp.setType(StreamPartition.Type.GROUPBY);
+ pd.addPartition(sp);
+ return pd;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
new file mode 100644
index 0000000..91f9cf8
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PublishmentTypeTest {
+ @Test
+ public void testPublishmentType() {
+ PublishmentType publishmentType = new PublishmentType();
+ publishmentType.setType("KAFKA");
+ publishmentType.setClassName("setClassName");
+ publishmentType.setDescription("setDescription");
+ publishmentType.setFields("setFields");
+
+ PublishmentType publishmentType1 = new PublishmentType();
+ publishmentType1.setType("KAFKA");
+ publishmentType1.setClassName("setClassName");
+ publishmentType1.setDescription("setDescription");
+ publishmentType1.setFields("setFields");
+
+ Assert.assertFalse(publishmentType.equals(new String("")));
+ Assert.assertFalse(publishmentType == publishmentType1);
+ Assert.assertTrue(publishmentType.equals(publishmentType1));
+ Assert.assertTrue(publishmentType.hashCode() == publishmentType1.hashCode());
+
+ publishmentType1.setType("JMS");
+
+ Assert.assertFalse(publishmentType.equals(publishmentType1));
+ Assert.assertFalse(publishmentType.hashCode() == publishmentType1.hashCode());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java
new file mode 100644
index 0000000..ccc7717
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+
+
+public class StreamColumnTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testStreamStringColumn() {
+ StreamColumn streamColumn = new StreamColumn.Builder().name("NAMEyhd").type(StreamColumn.Type.STRING).defaultValue("EAGLEyhd").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,NAMEyhd");
+ Assert.assertEquals("StreamColumn=name[NAMEyhd], type=[string], defaultValue=[EAGLEyhd], required=[true], nodataExpression=[PT1M,dynamic,1,NAMEyhd]", streamColumn.toString());
+ Assert.assertTrue(streamColumn.getDefaultValue() instanceof String);
+ }
+
+ @Test
+ public void testStreamLongColumn() {
+ thrown.expect(NumberFormatException.class);
+ new StreamColumn.Builder().name("salary").type(StreamColumn.Type.LONG).defaultValue("eagle").required(true).build();
+ }
+
+ @Test
+ public void testStreamLongColumn1() {
+ StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.LONG).defaultValue("0").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[long], defaultValue=[0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+ Assert.assertTrue(streamColumn.getDefaultValue() instanceof Long);
+ }
+
+ @Test
+ public void testStreamDoubleColumn() {
+ thrown.expect(NumberFormatException.class);
+ new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("eagle").required(true).build();
+ }
+
+ @Test
+ public void testStreamDoubleColumn1() {
+ StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("0.1").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[double], defaultValue=[0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+ streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("-0.1").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[double], defaultValue=[-0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+ streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("1").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[double], defaultValue=[1.0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+ Assert.assertTrue(streamColumn.getDefaultValue() instanceof Double);
+ }
+
+ @Test
+ public void testStreamFloatColumn() {
+ thrown.expect(NumberFormatException.class);
+ new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("eagle").required(true).build();
+ }
+
+ @Test
+ public void testStreamFloatColumn1() {
+ StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("0.1").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[float], defaultValue=[0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+ streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("-0.1").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[float], defaultValue=[-0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+ streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("1").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[float], defaultValue=[1.0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+ Assert.assertTrue(streamColumn.getDefaultValue() instanceof Float);
+ }
+
+ @Test
+ public void testStreamIntColumn() {
+ thrown.expect(NumberFormatException.class);
+ new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("eagle").required(true).build();
+ }
+
+ @Test
+ public void testStreamIntColumn1() {
+ thrown.expect(NumberFormatException.class);
+ new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("0.1").required(true).build();
+ }
+
+
+ @Test
+ public void testStreamIntColumn2() {
+ StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("1").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[int], defaultValue=[1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+ streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("0").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+ Assert.assertEquals("StreamColumn=name[salary], type=[int], defaultValue=[0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+ Assert.assertTrue(streamColumn.getDefaultValue() instanceof Integer);
+ }
+
+ @Test
+ public void testStreamBoolColumn() {
+ StreamColumn streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("eagle").required(false).build();
+ streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+ Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[false], required=[false], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+ streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("1").required(true).build();
+ streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+ Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[false], required=[true], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+ streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("0").required(true).build();
+ streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+ Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[false], required=[true], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+ streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("True").required(true).build();
+ streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+ Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[true], required=[true], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+ Assert.assertTrue(streamBoolColumn.getDefaultValue() instanceof Boolean);
+ }
+
+ @Test
+ public void testStreamObjectColumn() {
+ thrown.expect(IllegalArgumentException.class);
+ new StreamColumn.Builder().name("name").type(StreamColumn.Type.OBJECT).defaultValue("eagle").required(true).build();
+ }
+
+ @Test
+ public void testStreamObjectColumn1() {
+ StreamColumn streamColumn = new StreamColumn.Builder().name("name").type(StreamColumn.Type.OBJECT).defaultValue("{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}").required(true).build();
+ streamColumn.setNodataExpression("PT1M,dynamic,1,name");
+ Assert.assertEquals("StreamColumn=name[name], type=[object], defaultValue=[{name=heap.COMMITTED, Value=175636480}], required=[true], nodataExpression=[PT1M,dynamic,1,name]", streamColumn.toString());
+ Assert.assertTrue(streamColumn.getDefaultValue() instanceof HashMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
new file mode 100644
index 0000000..b5015cd
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamDefinitionTest {
+ @Test
+ public void testStreamDefinition() {
+
+ List<StreamColumn> streamColumns = new ArrayList<>();
+ streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+ streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+ streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+
+ StreamDefinition streamDefinition = new StreamDefinition();
+ Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[]", streamDefinition.toString());
+ streamDefinition.setColumns(streamColumns);
+
+ Assert.assertEquals(3, streamDefinition.getColumnIndex("data"));
+ Assert.assertEquals(-1, streamDefinition.getColumnIndex("DATA"));
+ Assert.assertEquals(-1, streamDefinition.getColumnIndex("isYhd"));
+ Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition.toString());
+ StreamDefinition streamDefinition1 = streamDefinition.copy();
+ Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition1.toString());
+
+ Assert.assertFalse(streamDefinition1.equals(streamDefinition));
+ Assert.assertFalse(streamDefinition1 == streamDefinition);
+ Assert.assertFalse(streamDefinition1.hashCode() == streamDefinition.hashCode());
+ }
+}