You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/11/09 13:56:05 UTC

[1/2] incubator-eagle git commit: [EAGLE-749] Add unit test for model.

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 1da8dc4f1 -> 2b61cef58


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamPartitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamPartitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamPartitionTest.java
new file mode 100644
index 0000000..f7615df
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamPartitionTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamPartitionTest {
+    @Test
+    public void testStreamPartition() {
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+        streamSortSpec.setWindowPeriod("PT10S");
+        StreamPartition streamPartition = new StreamPartition();
+        Assert.assertEquals("StreamPartition[streamId=null,type=null,columns=[],sortSpec=[null]]", streamPartition.toString());
+        List<String> columns = new ArrayList<>();
+        columns.add("jobId");
+        streamPartition.setColumns(columns);
+        streamPartition.setSortSpec(streamSortSpec);
+        streamPartition.setStreamId("test");
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+        Assert.assertEquals("StreamPartition[streamId=test,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]]", streamPartition.toString());
+        Assert.assertTrue(streamPartition.equals(new StreamPartition(streamPartition)));
+        Assert.assertTrue(streamPartition.hashCode() == new StreamPartition(streamPartition).hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpecTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpecTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpecTest.java
new file mode 100644
index 0000000..d8123f1
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpecTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StreamSortSpecTest {
+    @Test
+    public void testStreamSortSpec() {
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+        Assert.assertEquals(30000, streamSortSpec.getWindowMargin());
+        Assert.assertEquals("", streamSortSpec.getWindowPeriod());
+        Assert.assertEquals(0, streamSortSpec.getWindowPeriodMillis());
+        Assert.assertEquals("StreamSortSpec[windowPeriod=,windowMargin=30000]", streamSortSpec.toString());
+        streamSortSpec.setWindowPeriod("PT60S");
+        Assert.assertEquals(60000, streamSortSpec.getWindowPeriodMillis());
+        Assert.assertEquals("PT60S", streamSortSpec.getWindowPeriod());
+        Assert.assertEquals("StreamSortSpec[windowPeriod=PT60S,windowMargin=30000]", streamSortSpec.toString());
+        streamSortSpec.setWindowMargin(20);
+        Assert.assertEquals(20, streamSortSpec.getWindowMargin());
+        streamSortSpec.setWindowPeriodMillis(50000);
+        Assert.assertEquals("StreamSortSpec[windowPeriod=PT50S,windowMargin=20]", streamSortSpec.toString());
+        streamSortSpec.setWindowPeriod2(Period.minutes(10));
+        Assert.assertEquals("StreamSortSpec[windowPeriod=PT10M,windowMargin=20]", streamSortSpec.toString());
+        Assert.assertTrue(streamSortSpec.equals(new StreamSortSpec(streamSortSpec)));
+        Assert.assertTrue(streamSortSpec.hashCode() == new StreamSortSpec(streamSortSpec).hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamingClusterTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamingClusterTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamingClusterTest.java
new file mode 100644
index 0000000..2b79cf6
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamingClusterTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class StreamingClusterTest {
+    @Test
+    public void testStreamingCluster() {
+        StreamingCluster cluster = new StreamingCluster();
+        cluster.setName("test");
+        cluster.setDeployments(new HashMap<>());
+        cluster.setDescription("setDescription");
+        cluster.setType(StreamingCluster.StreamingType.STORM);
+        cluster.setZone("setZone");
+
+        StreamingCluster cluster1 = new StreamingCluster();
+        cluster1.setName("test");
+        cluster1.setDeployments(new HashMap<>());
+        cluster1.setDescription("setDescription");
+        cluster1.setType(StreamingCluster.StreamingType.STORM);
+        cluster1.setZone("setZone");
+
+
+        Assert.assertFalse(cluster == cluster1);
+        Assert.assertFalse(cluster.equals(cluster1));
+        Assert.assertFalse(cluster.hashCode() == cluster1.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
new file mode 100644
index 0000000..d534e3b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.*;
+
+public class AlertPublishEventTest {
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void testAlertPublishEvent() {
+        thrown.expect(NullPointerException.class);
+        AlertStreamEvent alertStreamEvent = new AlertStreamEvent();
+        AlertPublishEvent.createAlertPublishEvent(alertStreamEvent);
+    }
+
+    @Test
+    public void testAlertPublishEvent1() {
+        thrown.expect(NullPointerException.class);
+        List<StreamColumn> streamColumns = new ArrayList<>();
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+        streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+        streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+        streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setColumns(streamColumns);
+        AlertStreamEvent alertStreamEvent = new AlertStreamEvent();
+        alertStreamEvent.setSchema(streamDefinition);
+        AlertPublishEvent.createAlertPublishEvent(alertStreamEvent);
+    }
+
+    @Test
+    public void testAlertPublishEvent2() {
+        List<StreamColumn> streamColumns = new ArrayList<>();
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+        streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+        streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+        streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setColumns(streamColumns);
+        AlertStreamEvent alertStreamEvent = new AlertStreamEvent();
+        alertStreamEvent.setData(new Object[]{"namevalue", "hostvalue", "1", 10, 0.1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", 1});
+        alertStreamEvent.setSchema(streamDefinition);
+        alertStreamEvent.setPolicyId("setPolicyId");
+        alertStreamEvent.setCreatedTime(1234);
+        AlertPublishEvent alertPublishEvent = AlertPublishEvent.createAlertPublishEvent(alertStreamEvent);
+        Assert.assertEquals(null, alertPublishEvent.getSiteId());
+        Assert.assertTrue(alertPublishEvent.getAlertId() != null);
+        Assert.assertEquals("setPolicyId", alertPublishEvent.getPolicyId());
+        Assert.assertEquals(null, alertPublishEvent.getPolicyValue());
+        Assert.assertEquals("{flag=1, data=0.1, name=namevalue, host=hostvalue, salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}", alertPublishEvent.getAlertData().toString());
+        Assert.assertEquals(1234, alertPublishEvent.getAlertTimestamp());
+        Assert.assertEquals(null, alertPublishEvent.getAppIds());
+
+        AlertPublishEvent alertPublishEvent1 = AlertPublishEvent.createAlertPublishEvent(alertStreamEvent);
+        Assert.assertFalse(alertPublishEvent1 == alertPublishEvent);
+        Assert.assertFalse(alertPublishEvent1.equals(alertPublishEvent));
+        Assert.assertFalse(alertPublishEvent1.hashCode() == alertPublishEvent.hashCode());
+
+        Map<String, Object> extraData = new HashMap<>();
+        extraData.put(AlertPublishEvent.SITE_ID_KEY, "SITE_ID_KEY");
+        extraData.put(AlertPublishEvent.POLICY_VALUE_KEY, "POLICY_VALUE_KEY");
+        extraData.put(AlertPublishEvent.APP_IDS_KEY, Arrays.asList("appId1", "appId2"));
+        alertStreamEvent.setExtraData(extraData);
+
+        alertPublishEvent = AlertPublishEvent.createAlertPublishEvent(alertStreamEvent);
+        Assert.assertEquals("SITE_ID_KEY", alertPublishEvent.getSiteId());
+        Assert.assertTrue(alertPublishEvent.getAlertId() != null);
+        Assert.assertEquals("setPolicyId", alertPublishEvent.getPolicyId());
+        Assert.assertEquals("POLICY_VALUE_KEY", alertPublishEvent.getPolicyValue());
+        Assert.assertEquals("{flag=1, data=0.1, name=namevalue, host=hostvalue, salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}", alertPublishEvent.getAlertData().toString());
+        Assert.assertEquals(1234, alertPublishEvent.getAlertTimestamp());
+        Assert.assertEquals("appId1", alertPublishEvent.getAppIds().get(0));
+        Assert.assertEquals("appId2", alertPublishEvent.getAppIds().get(1));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java
new file mode 100644
index 0000000..eec3675
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class AlertStreamEventTest {
+
+    @Test
+    public void testAlertStreamEvent() {
+        List<StreamColumn> streamColumns = new ArrayList<>();
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+        streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+        streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+        streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setColumns(streamColumns);
+        AlertStreamEvent alertStreamEvent = new AlertStreamEvent();
+        alertStreamEvent.setSchema(streamDefinition);
+        alertStreamEvent.setData(new Object[]{"namevalue", "hostvalue", "1", 10, 0.1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", 1});
+        Assert.assertEquals("AlertStreamEvent[stream=NULL,timestamp=1970-01-01 00:00:00,000,data=[namevalue,hostvalue,1,10,0.1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},1], policyId=null, createdBy=null, metaVersion=null]", alertStreamEvent.toString());
+        Assert.assertEquals("{flag=1, data=0.1, name=namevalue, host=hostvalue, salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}", alertStreamEvent.getDataMap().toString());
+
+        AlertStreamEvent alertStreamEvent1 = new AlertStreamEvent(alertStreamEvent);
+
+        Assert.assertEquals("AlertStreamEvent[stream=NULL,timestamp=1970-01-01 00:00:00,000,data=[namevalue,hostvalue,1,10,0.1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},1], policyId=null, createdBy=null, metaVersion=null]", alertStreamEvent1.toString());
+        Assert.assertEquals("{flag=1, data=0.1, name=namevalue, host=hostvalue, salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}", alertStreamEvent1.getDataMap().toString());
+
+
+        Assert.assertFalse(alertStreamEvent1 == alertStreamEvent);
+        Assert.assertTrue(alertStreamEvent1.equals(alertStreamEvent));
+        Assert.assertTrue(alertStreamEvent1.hashCode() == alertStreamEvent.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/PartitionedEventTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/PartitionedEventTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/PartitionedEventTest.java
new file mode 100644
index 0000000..719445b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/PartitionedEventTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PartitionedEventTest {
+    @Test
+    public void testPartitionedEvent() {
+        PartitionedEvent partitionedEvent = new PartitionedEvent();
+        Assert.assertEquals("PartitionedEvent[partition=null,event=null,key=0", partitionedEvent.toString());
+
+        Object[] data = new Object[]{"namevalue", "hostvalue", "1", 10, 0.1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", 1};
+        StreamEvent streamEvent = new StreamEvent("streamId", 1478667686971l, data);
+
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+        streamSortSpec.setWindowPeriod("PT10S");
+        StreamPartition streamPartition = new StreamPartition();
+        List<String> columns = new ArrayList<>();
+        columns.add("jobId");
+        streamPartition.setColumns(columns);
+        streamPartition.setSortSpec(streamSortSpec);
+        streamPartition.setStreamId("test");
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+
+        partitionedEvent = new PartitionedEvent(streamEvent, streamPartition, 1);
+        Assert.assertEquals("PartitionedEvent[partition=StreamPartition[streamId=test,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]],event=StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,hostvalue,1,10,0.1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},1],metaVersion=null],key=1", partitionedEvent.toString());
+        PartitionedEvent partitionedEventCopy = partitionedEvent.copy();
+        Assert.assertFalse(partitionedEventCopy == partitionedEvent);
+        Assert.assertTrue(partitionedEventCopy.equals(partitionedEvent));
+        Assert.assertTrue(partitionedEventCopy.hashCode() == partitionedEvent.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventBuilderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventBuilderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventBuilderTest.java
new file mode 100644
index 0000000..16c7ad2
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventBuilderTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class StreamEventBuilderTest {
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void testStreamEventBuilder() {
+
+        List<StreamColumn> streamColumns = new ArrayList<>();
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+        streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+        streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+        streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setColumns(streamColumns);
+
+        StreamEventBuilder streamEventBuilder = new StreamEventBuilder();
+        StreamEvent streamEvent = streamEventBuilder.schema(streamDefinition).streamId("streamId").metaVersion("metaVersion").timestamep(1478667686971l).build();
+        Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[],metaVersion=metaVersion]", streamEvent.toString());
+        Object[] data = new Object[]{"namevalue", "hostvalue", "1", 10.0, 1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", 4};
+        streamEvent = streamEventBuilder.schema(streamDefinition).attributes(data).streamId("streamId").metaVersion("metaVersion").timestamep(1478667686971l).build();
+        Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,hostvalue,1,10.0,1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},4],metaVersion=metaVersion]", streamEvent.toString());
+
+        Map<String, Object> mapdata = new HashMap<>();
+        mapdata.put("name", "namevalue");
+        mapdata.put("host", "hostvalue");
+        mapdata.put("flag", "1");
+        mapdata.put("value", 10.0);
+        mapdata.put("data", 1);
+        mapdata.put("salary", -0.2);
+        mapdata.put("object", "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}");
+        mapdata.put("int", 4);
+        StreamEvent streamEvent1 = streamEventBuilder.schema(streamDefinition).attributes(mapdata, streamDefinition).streamId("streamId").metaVersion("metaVersion").timestamep(1478667686971l).build();
+        Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,hostvalue,1,10.0,1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},4],metaVersion=metaVersion]", streamEvent.toString());
+
+        Assert.assertTrue(streamEvent1 == streamEvent);
+        Assert.assertTrue(streamEvent1.equals(streamEvent));
+        Assert.assertTrue(streamEvent1.hashCode() == streamEvent.hashCode());
+
+        StreamEventBuilder streamEventBuilder1 = new StreamEventBuilder();
+        streamEvent1 = streamEventBuilder1.schema(streamDefinition).attributes(mapdata, streamDefinition).streamId("streamId").metaVersion("metaVersion").timestamep(1478667686971l).build();
+
+        Assert.assertFalse(streamEvent1 == streamEvent);
+        Assert.assertTrue(streamEvent1.equals(streamEvent));
+        Assert.assertTrue(streamEvent1.hashCode() == streamEvent.hashCode());
+    }
+
+    @Test
+    public void testStreamEventBuilder1() {
+        thrown.expect(IllegalArgumentException.class);
+        StreamEventBuilder streamEventBuilder = new StreamEventBuilder();
+        streamEventBuilder.metaVersion("metaVersion").timestamep(1478667686971l).build();
+    }
+
+    @Test
+    public void testStreamEventBuilder2() {
+        StreamEventBuilder streamEventBuilder = new StreamEventBuilder();
+
+        Map<String, Object> mapdata = new HashMap<>();
+        mapdata.put("name", "namevalue");
+        mapdata.put("host", "hostvalue");
+        mapdata.put("flag", "1");
+        mapdata.put("value", 10.0);
+        mapdata.put("data", 1);
+        mapdata.put("salary", -0.2);
+        mapdata.put("object", "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}");
+        mapdata.put("int", 4);
+
+        List<StreamColumn> streamColumns = new ArrayList<>();
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setColumns(streamColumns);
+
+        StreamEvent streamEvent = streamEventBuilder.attributes(mapdata, streamDefinition).streamId("streamId").metaVersion("metaVersion").timestamep(1478667686971l).build();
+        Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,hostvalue,4],metaVersion=metaVersion]", streamEvent.toString());
+    }
+
+    @Test
+    public void testStreamEventBuilder3() {
+        StreamEventBuilder streamEventBuilder = new StreamEventBuilder();
+
+        Map<String, Object> mapdata = new HashMap<>();
+        mapdata.put("name", "namevalue");
+        mapdata.put("host", "hostvalue");
+        mapdata.put("flag", "1");
+        mapdata.put("value", 10.0);
+        mapdata.put("data", 1);
+        mapdata.put("salary", -0.2);
+        mapdata.put("object", "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}");
+
+        List<StreamColumn> streamColumns = new ArrayList<>();
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+        streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+        streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+        StreamColumn streamColumn = new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build();
+        streamColumn.setDefaultValue(100);
+        streamColumns.add(streamColumn);
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setColumns(streamColumns);
+
+        StreamEvent streamEvent = streamEventBuilder.attributes(mapdata, streamDefinition).streamId("streamId").metaVersion("metaVersion").timestamep(1478667686971l).build();
+        Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,hostvalue,1,10.0,1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},100],metaVersion=metaVersion]", streamEvent.toString());
+    }
+
+    @Test
+    public void testStreamEventBuilder4() {
+        StreamEventBuilder streamEventBuilder = new StreamEventBuilder();
+
+        Map<String, Object> mapdata = new HashMap<>();
+        mapdata.put("name", "namevalue");
+        mapdata.put("host1", "hostvalue");
+        mapdata.put("flag", "1");
+
+        List<StreamColumn> streamColumns = new ArrayList<>();
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setColumns(streamColumns);
+
+        StreamEvent streamEvent = streamEventBuilder.attributes(mapdata, streamDefinition).streamId("streamId").metaVersion("metaVersion").timestamep(1478667686971l).build();
+        Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,,1,],metaVersion=metaVersion]", streamEvent.toString());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java
new file mode 100644
index 0000000..6543a8d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/StreamEventTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+public class StreamEventTest {
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void testStreamEvent() {
+        Object[] data = new Object[]{"namevalue", "hostvalue", "1", 10.0, 1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", 4};
+        StreamEvent streamEvent = new StreamEvent("streamId", 1478667686971l, data);
+
+        Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,hostvalue,1,10.0,1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},4],metaVersion=null]", streamEvent.toString());
+
+        streamEvent = new StreamEvent("streamId", 1478667686971l, data, "metaVersion");
+
+        Assert.assertEquals("StreamEvent[stream=STREAMID,timestamp=2016-11-09 05:01:26,971,data=[namevalue,hostvalue,1,10.0,1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},4],metaVersion=metaVersion]", streamEvent.toString());
+        StreamEvent streamEventCopy = streamEvent.copy();
+        Assert.assertFalse(streamEventCopy == streamEvent);
+        Assert.assertTrue(streamEventCopy.equals(streamEvent));
+        Assert.assertTrue(streamEventCopy.hashCode() == streamEvent.hashCode());
+
+        streamEventCopy.setMetaVersion("");
+        Assert.assertFalse(streamEventCopy == streamEvent);
+        Assert.assertFalse(streamEventCopy.equals(streamEvent));
+        Assert.assertFalse(streamEventCopy.hashCode() == streamEvent.hashCode());
+
+        streamEventCopy.copyFrom(streamEvent);
+
+        Assert.assertFalse(streamEventCopy == streamEvent);
+        Assert.assertTrue(streamEventCopy.equals(streamEvent));
+        Assert.assertTrue(streamEventCopy.hashCode() == streamEvent.hashCode());
+
+
+        List<StreamColumn> streamColumns = new ArrayList<>();
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+        streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+        streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+        streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setColumns(streamColumns);
+
+        Object[] values = streamEvent.getData(streamDefinition, "int", "salary", "flag", "object");
+        Assert.assertEquals(4, values[0]);
+        Assert.assertEquals(-0.2, values[1]);
+        Assert.assertEquals("1", values[2]);
+        Assert.assertEquals("{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", values[3]);
+
+        values = streamEvent.getData(streamDefinition, Arrays.asList("int", "data", "flag", "object"));
+        Assert.assertEquals(4, values[0]);
+        Assert.assertEquals(1, values[1]);
+        Assert.assertEquals("1", values[2]);
+        Assert.assertEquals("{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", values[3]);
+    }
+
+
+    @Test
+    public void testStreamEvent1() {
+        thrown.expect(IndexOutOfBoundsException.class);
+        List<StreamColumn> streamColumns = new ArrayList<>();
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+        streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+        streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+        streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setColumns(streamColumns);
+        StreamEvent streamEvent = new StreamEvent();
+        streamEvent.setData(new Object[]{"namevalue", "hostvalue", "1", 10.0, 1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}\"", 4});
+        streamEvent.getData(streamDefinition, "salary", "isYhd");
+
+    }
+
+    @Test
+    public void testStreamEvent2() {
+        thrown.expect(IndexOutOfBoundsException.class);
+        List<StreamColumn> streamColumns = new ArrayList<>();
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+        streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+        streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+        streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setColumns(streamColumns);
+        StreamEvent streamEvent = new StreamEvent();
+        streamEvent.setData(new Object[]{"namevalue", "hostvalue", "1", 10.0, 1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}\""});
+        streamEvent.getData(streamDefinition, "salary", "int");
+
+    }
+
+    @Test
+    public void testStreamEvent3() {
+        List<StreamColumn> streamColumns = new ArrayList<>();
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+        streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).build());
+        streamColumns.add(new StreamColumn.Builder().name("object").type(StreamColumn.Type.OBJECT).build());
+        streamColumns.add(new StreamColumn.Builder().name("int").type(StreamColumn.Type.INT).build());
+
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setColumns(streamColumns);
+        StreamEvent streamEvent = new StreamEvent();
+        streamEvent.setData(new Object[]{"namevalue", 1, "flag", 10.0, 0.1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}\"", 1});
+        Object[] values = streamEvent.getData(streamDefinition, "value", "host");
+        Assert.assertEquals(10.0, values[0]);
+        Assert.assertEquals(1, values[1]);
+    }
+
+    @Test
+    public void testStreamEventEqual() {
+        Long timestamp = System.currentTimeMillis();
+        StreamEvent event1 = mockSimpleStreamEvent(timestamp);
+        StreamEvent event2 = mockSimpleStreamEvent(timestamp);
+        StreamEvent event3 = event2.copy();
+        Assert.assertEquals(event1, event2);
+        Assert.assertEquals(event2, event3);
+    }
+
+    private static StreamEvent mockSimpleStreamEvent(Long timestamp) {
+        return StreamEvent.builder()
+                .schema(mockStreamDefinition("sampleStream_1"))
+                .streamId("sampleStream_1")
+                .timestamep(timestamp)
+                .attributes(new HashMap<String, Object>() {{
+                    put("name", "cpu");
+                    put("value", 60.0);
+                    put("unknown", "unknown column value");
+                }}).build();
+    }
+
+    private static StreamDefinition mockStreamDefinition(String streamId) {
+        StreamDefinition sampleStreamDefinition = new StreamDefinition();
+        List<StreamColumn> streamColumns = new ArrayList<>();
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+
+        sampleStreamDefinition.setStreamId(streamId);
+        sampleStreamDefinition.setTimeseries(true);
+        sampleStreamDefinition.setValidate(true);
+        sampleStreamDefinition.setDescription("Schema for " + streamId);
+        sampleStreamDefinition.setColumns(streamColumns);
+        return sampleStreamDefinition;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/StreamEventTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/StreamEventTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/StreamEventTest.java
deleted file mode 100644
index e8ec1ad..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/StreamEventTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.model;
-
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-public class StreamEventTest {
-    @Test
-    public void testStreamEventEqual() {
-        Long timestamp = System.currentTimeMillis();
-        StreamEvent event1 = mockSimpleStreamEvent(timestamp);
-        StreamEvent event2 = mockSimpleStreamEvent(timestamp);
-        StreamEvent event3 = event2.copy();
-        Assert.assertEquals(event1, event2);
-        Assert.assertEquals(event2, event3);
-    }
-
-    private static StreamEvent mockSimpleStreamEvent(Long timestamp) {
-        return StreamEvent.builder()
-            .schema(mockStreamDefinition("sampleStream_1"))
-            .streamId("sampleStream_1")
-            .timestamep(timestamp)
-            .attributes(new HashMap<String, Object>() {{
-                put("name", "cpu");
-                put("value", 60.0);
-                put("unknown", "unknown column value");
-            }}).build();
-    }
-
-    private static StreamDefinition mockStreamDefinition(String streamId) {
-        StreamDefinition sampleStreamDefinition = new StreamDefinition();
-        List<StreamColumn> streamColumns = new ArrayList<>();
-        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
-        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
-        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
-        streamColumns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
-        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
-
-        sampleStreamDefinition.setStreamId(streamId);
-        sampleStreamDefinition.setTimeseries(true);
-        sampleStreamDefinition.setValidate(true);
-        sampleStreamDefinition.setDescription("Schema for " + streamId);
-        sampleStreamDefinition.setColumns(streamColumns);
-        return sampleStreamDefinition;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java
deleted file mode 100644
index 1cc063d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/model/TestPolicyDefinition.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.model;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestPolicyDefinition {
-
-    @Test
-    public void testEqual() {
-        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
-        PolicyDefinition policy1 = new PolicyDefinition();
-        policy1.setName("policy1");
-        policy1.setDefinition(definition);
-
-        PolicyDefinition policy2 = new PolicyDefinition();
-        policy2.setName("policy1");
-        policy2.setPolicyStatus(PolicyDefinition.PolicyStatus.DISABLED);
-        policy2.setDefinition(definition);
-
-        PolicyDefinition policy3 = new PolicyDefinition();
-        policy3.setName("policy1");
-        policy3.setDefinition(definition);
-
-        Assert.assertTrue(policy1.equals(policy3));
-        Assert.assertFalse(policy1.equals(policy2));
-    }
-}


[2/2] incubator-eagle git commit: [EAGLE-749] Add unit test for model.

Posted by ha...@apache.org.
[EAGLE-749] Add unit test for model.

 - Add unit test for model which under alert-common moudle.
 - Fix equals() hashcode() inconsistent for PartitionedEvent,StreamEvent,Publishment.
 - Fix bug for StreamColumn BooleanType.

https://issues.apache.org/jira/browse/EAGLE-749

Author: r7raul1984 <ta...@yhd.com>

Closes #632 from r7raul1984/EAGLE-749.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/2b61cef5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/2b61cef5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/2b61cef5

Branch: refs/heads/master
Commit: 2b61cef585b02fb6f40d5c3de7f7c5c060926ecd
Parents: 1da8dc4
Author: r7raul1984 <ta...@yhd.com>
Authored: Wed Nov 9 21:55:51 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Nov 9 21:55:51 2016 +0800

----------------------------------------------------------------------
 .../alert/engine/coordinator/Publishment.java   |  21 +-
 .../alert/engine/coordinator/StreamColumn.java  |   2 +-
 .../engine/coordinator/StreamSortSpec.java      |  17 +-
 .../alert/engine/model/PartitionedEvent.java    |  16 +-
 .../eagle/alert/engine/model/StreamEvent.java   |  11 +-
 .../eagle/alert/config/TestConfigBus.java       |   5 +-
 .../model/Kafka2TupleMetadataTest.java          |  49 +++++
 .../model/PolicyWorkerQueueTest.java            |  63 ++++++
 .../model/StreamRepartitionStrategyTest.java    |  74 +++++++
 .../model/StreamRouterSpecTest.java             |  59 ++++++
 .../model/Tuple2StreamMetadataTest.java         |  50 +++++
 .../alert/coordination/model/WorkSlotTest.java  |  43 +++++
 .../model/internal/MonitoredStreamTest.java     |  69 +++++++
 .../model/internal/PolicyAssignmentTest.java    |  37 ++++
 .../model/internal/StreamGroupTest.java         |  67 +++++++
 .../model/internal/StreamWorkSlotQueueTest.java |  61 ++++++
 .../model/internal/TopologyTest.java            |  47 +++++
 .../OverrideDeduplicatorSpecTest.java           |  61 ++++++
 .../coordinator/PolicyDefinitionTest.java       | 140 ++++++++++++++
 .../engine/coordinator/PublishmentTest.java     | 100 ++++++++++
 .../engine/coordinator/PublishmentTypeTest.java |  47 +++++
 .../engine/coordinator/StreamColumnTest.java    | 153 +++++++++++++++
 .../coordinator/StreamDefinitionTest.java       |  52 +++++
 .../engine/coordinator/StreamPartitionTest.java |  43 +++++
 .../engine/coordinator/StreamSortSpecTest.java  |  45 +++++
 .../coordinator/StreamingClusterTest.java       |  47 +++++
 .../engine/model/AlertPublishEventTest.java     | 110 +++++++++++
 .../engine/model/AlertStreamEventTest.java      |  63 ++++++
 .../engine/model/PartitionedEventTest.java      |  54 ++++++
 .../engine/model/StreamEventBuilderTest.java    | 166 ++++++++++++++++
 .../alert/engine/model/StreamEventTest.java     | 191 +++++++++++++++++++
 .../eagle/alert/model/StreamEventTest.java      |  68 -------
 .../eagle/alert/model/TestPolicyDefinition.java |  45 -----
 33 files changed, 1930 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
index dbb1844..d1cc33a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -135,28 +135,29 @@ public class Publishment {
         if (obj instanceof Publishment) {
             Publishment p = (Publishment) obj;
             return (Objects.equals(name, p.getName()) && Objects.equals(type, p.getType())
-                && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin())
-                && Objects.equals(dedupFields, p.getDedupFields())
-                && Objects.equals(dedupStateField, p.getDedupStateField())
-                && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator())
-                && Objects.equals(policyIds, p.getPolicyIds())
-                && Objects.equals(streamIds, p.getStreamIds())
-                && properties.equals(p.getProperties()));
+                    && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin())
+                    && Objects.equals(dedupFields, p.getDedupFields())
+                    && Objects.equals(dedupStateField, p.getDedupStateField())
+                    && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator())
+                    && Objects.equals(policyIds, p.getPolicyIds())
+                    && Objects.equals(streamIds, p.getStreamIds())
+                    && properties.equals(p.getProperties()));
         }
         return false;
     }
 
     @Override
     public int hashCode() {
-        return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(policyIds)
-            .append(properties).build();
+        return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(dedupFields)
+                .append(dedupStateField).append(overrideDeduplicator).append(policyIds).append(streamIds)
+                .append(properties).build();
     }
 
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("Publishment[name:").append(name).append(",type:").append(type).append(",policyId:")
-            .append(policyIds).append(",properties:").append(properties);
+                .append(policyIds).append(",properties:").append(properties);
         return sb.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
index 5a5f2cc..4628043 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
@@ -85,7 +85,7 @@ public class StreamColumn implements Serializable {
                     this.setDefaultValue(Double.valueOf((String) this.getDefaultValue()));
                     break;
                 case BOOL:
-                    this.setDefaultValue(Double.valueOf((String) this.getDefaultValue()));
+                    this.setDefaultValue(Boolean.valueOf((String) this.getDefaultValue()));
                     break;
                 case OBJECT:
                     try {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
index 65b9151..ff05fc8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.alert.engine.coordinator;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.eagle.alert.utils.TimePeriodUtils;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.commons.lang.builder.HashCodeBuilder;
@@ -45,7 +46,7 @@ public class StreamSortSpec implements Serializable {
     }
 
     public int getWindowPeriodMillis() {
-        if (windowPeriod != null) {
+        if (StringUtils.isNotBlank(windowPeriod)) {
             return TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(windowPeriod));
         } else {
             return 0;
@@ -76,9 +77,9 @@ public class StreamSortSpec implements Serializable {
     @Override
     public int hashCode() {
         return new HashCodeBuilder()
-            .append(windowPeriod)
-            .append(windowMargin)
-            .toHashCode();
+                .append(windowPeriod)
+                .append(windowMargin)
+                .toHashCode();
     }
 
     @Override
@@ -92,14 +93,14 @@ public class StreamSortSpec implements Serializable {
 
         StreamSortSpec another = (StreamSortSpec) that;
         return
-            another.windowPeriod.equals(this.windowPeriod)
-                && another.windowMargin == this.windowMargin;
+                another.windowPeriod.equals(this.windowPeriod)
+                        && another.windowMargin == this.windowMargin;
     }
 
     @Override
     public String toString() {
         return String.format("StreamSortSpec[windowPeriod=%s,windowMargin=%d]",
-            this.getWindowPeriod(),
-            this.getWindowMargin());
+                this.getWindowPeriod(),
+                this.getWindowMargin());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
index 51e4532..ecca0ff 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
@@ -19,6 +19,7 @@ package org.apache.eagle.alert.engine.model;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import backtype.storm.tuple.Tuple;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 import java.io.Serializable;
 import java.util.Objects;
 
@@ -63,9 +64,9 @@ public class PartitionedEvent implements Serializable {
         if (obj instanceof PartitionedEvent) {
             PartitionedEvent another = (PartitionedEvent) obj;
             return !(this.partitionKey != another.getPartitionKey()
-                || !Objects.equals(this.event, another.getEvent())
-                || !Objects.equals(this.partition, another.getPartition())
-                || !Objects.equals(this.anchor, another.anchor));
+                    || !Objects.equals(this.event, another.getEvent())
+                    || !Objects.equals(this.partition, another.getPartition())
+                    || !Objects.equals(this.anchor, another.anchor));
         } else {
             return false;
         }
@@ -74,10 +75,11 @@ public class PartitionedEvent implements Serializable {
     @Override
     public int hashCode() {
         return new HashCodeBuilder()
-            .append(partitionKey)
-            .append(event)
-            .append(partition)
-            .build();
+                .append(partitionKey)
+                .append(event)
+                .append(partition)
+                .append(anchor)
+                .build();
     }
 
     public StreamEvent getEvent() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
index 8480bc5..130985f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
@@ -95,7 +95,8 @@ public class StreamEvent implements Serializable {
         }
         if (obj instanceof StreamEvent) {
             StreamEvent another = (StreamEvent) obj;
-            return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp && Arrays.deepEquals(this.data, another.data);
+            return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp
+                    && Arrays.deepEquals(this.data, another.data) && Objects.equals(this.metaVersion, another.metaVersion);
         }
         return false;
     }
@@ -113,10 +114,10 @@ public class StreamEvent implements Serializable {
             }
         }
         return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s],metaVersion=%s]",
-            this.getStreamId(),
-            DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
-            StringUtils.join(dataStrings, ","),
-            this.getMetaVersion());
+                this.getStreamId(),
+                DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
+                StringUtils.join(dataStrings, ","),
+                this.getMetaVersion());
     }
 
     public static StreamEventBuilder builder() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
index 5c3f35e..e37e9be 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.alert.config;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -69,8 +70,8 @@ public class TestConfigBus {
     }
 
     @After
-    public void shutdown() {
-        CloseableUtils.closeQuietly(server);
+    public void shutdown() throws IOException {
+        server.stop();
         producer.close();
         consumer.close();
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java
new file mode 100644
index 0000000..a252fae
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class Kafka2TupleMetadataTest {
+    @Test
+    public void testKafka2TupleMetadata() {
+        Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata();
+        kafka2TupleMetadata.setName("setName");
+        kafka2TupleMetadata.setCodec(new Tuple2StreamMetadata());
+        kafka2TupleMetadata.setType("setType");
+        kafka2TupleMetadata.setTopic("setTopic");
+        kafka2TupleMetadata.setSchemeCls("org.apache.eagle.alert.engine.scheme.PlainStringScheme");
+
+        Kafka2TupleMetadata kafka2TupleMetadata1 = new Kafka2TupleMetadata();
+        kafka2TupleMetadata1.setName("setName");
+        kafka2TupleMetadata1.setCodec(new Tuple2StreamMetadata());
+        kafka2TupleMetadata1.setType("setType");
+        kafka2TupleMetadata1.setTopic("setTopic");
+        kafka2TupleMetadata1.setSchemeCls("org.apache.eagle.alert.engine.scheme.PlainStringScheme");
+
+        Assert.assertFalse(kafka2TupleMetadata1 == kafka2TupleMetadata);
+        Assert.assertTrue(kafka2TupleMetadata1.equals(kafka2TupleMetadata));
+        Assert.assertTrue(kafka2TupleMetadata1.hashCode() == kafka2TupleMetadata.hashCode());
+
+        kafka2TupleMetadata1.setType("setType1");
+
+        Assert.assertFalse(kafka2TupleMetadata1.equals(kafka2TupleMetadata));
+        Assert.assertFalse(kafka2TupleMetadata1.hashCode() == kafka2TupleMetadata.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java
new file mode 100644
index 0000000..71f3188
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PolicyWorkerQueueTest {
+    @Test
+    public void testPolicyWorkerQueue() {
+
+        List<WorkSlot> workers = new ArrayList<>();
+        WorkSlot workSlot1 = new WorkSlot("setTopologyName1", "setBoltId1");
+        WorkSlot workSlot2 = new WorkSlot("setTopologyName1", "setBoltId2");
+        workers.add(workSlot1);
+        workers.add(workSlot2);
+        PolicyWorkerQueue policyWorkerQueue = new PolicyWorkerQueue(workers);
+        Assert.assertEquals(null, policyWorkerQueue.getPartition());
+        Assert.assertEquals(workSlot1, policyWorkerQueue.getWorkers().get(0));
+        Assert.assertEquals(workSlot2, policyWorkerQueue.getWorkers().get(1));
+        Assert.assertEquals("[(setTopologyName1:setBoltId1),(setTopologyName1:setBoltId2)]", policyWorkerQueue.toString());
+
+        PolicyWorkerQueue policyWorkerQueue1 = new PolicyWorkerQueue();
+        policyWorkerQueue1.setWorkers(workers);
+
+        Assert.assertTrue(policyWorkerQueue.equals(policyWorkerQueue1));
+        Assert.assertTrue(policyWorkerQueue.hashCode() == policyWorkerQueue1.hashCode());
+
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+        streamSortSpec.setWindowPeriod("PT10S");
+        StreamPartition streamPartition = new StreamPartition();
+        List<String> columns = new ArrayList<>();
+        columns.add("jobId");
+        streamPartition.setColumns(columns);
+        streamPartition.setSortSpec(streamSortSpec);
+        streamPartition.setStreamId("test");
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+        policyWorkerQueue1.setPartition(streamPartition);
+
+        Assert.assertFalse(policyWorkerQueue.equals(policyWorkerQueue1));
+        Assert.assertFalse(policyWorkerQueue.hashCode() == policyWorkerQueue1.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java
new file mode 100644
index 0000000..c416a49
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamRepartitionStrategyTest {
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void testStreamRepartitionStrategy() {
+        thrown.expect(NullPointerException.class);
+        StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
+        streamRepartitionStrategy.hashCode();
+    }
+
+    @Test
+    public void testStreamRepartitionStrategy1() {
+        thrown.expect(NullPointerException.class);
+        StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
+        streamRepartitionStrategy.equals(streamRepartitionStrategy);
+    }
+
+    @Test
+    public void testStreamRepartitionStrategy2() {
+
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+        streamSortSpec.setWindowPeriod("PT10S");
+        StreamPartition streamPartition = new StreamPartition();
+        List<String> columns = new ArrayList<>();
+        columns.add("jobId");
+        streamPartition.setColumns(columns);
+        streamPartition.setSortSpec(streamSortSpec);
+        streamPartition.setStreamId("test");
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+
+
+        StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
+        Assert.assertEquals(null, streamRepartitionStrategy.getPartition());
+        Assert.assertEquals(0, streamRepartitionStrategy.getNumTotalParticipatingRouterBolts());
+        Assert.assertEquals(0, streamRepartitionStrategy.getStartSequence());
+        streamRepartitionStrategy.setPartition(streamPartition);
+        StreamRepartitionStrategy streamRepartitionStrategy1 = new StreamRepartitionStrategy();
+        streamRepartitionStrategy1.setPartition(streamPartition);
+
+        Assert.assertTrue(streamRepartitionStrategy.equals(streamRepartitionStrategy1));
+        Assert.assertTrue(streamRepartitionStrategy.hashCode() == streamRepartitionStrategy1.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java
new file mode 100644
index 0000000..88e72cb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamRouterSpecTest {
+    @Test
+    public void testStreamRouterSpec() {
+        StreamRouterSpec streamRouterSpec = new StreamRouterSpec();
+        Assert.assertEquals(null, streamRouterSpec.getPartition());
+        Assert.assertEquals(null, streamRouterSpec.getStreamId());
+        Assert.assertTrue(streamRouterSpec.getTargetQueue().isEmpty());
+
+        List<WorkSlot> workers = new ArrayList<>();
+        WorkSlot workSlot1 = new WorkSlot("setTopologyName1", "setBoltId1");
+        WorkSlot workSlot2 = new WorkSlot("setTopologyName1", "setBoltId2");
+        workers.add(workSlot1);
+        workers.add(workSlot2);
+        PolicyWorkerQueue policyWorkerQueue = new PolicyWorkerQueue(workers);
+        streamRouterSpec.addQueue(policyWorkerQueue);
+        streamRouterSpec.setStreamId("streamRouterSpec");
+
+        Assert.assertEquals("streamRouterSpec", streamRouterSpec.getStreamId());
+        Assert.assertEquals(1, streamRouterSpec.getTargetQueue().size());
+        Assert.assertEquals(2, streamRouterSpec.getTargetQueue().get(0).getWorkers().size());
+
+        StreamRouterSpec streamRouterSpec1 = new StreamRouterSpec();
+        streamRouterSpec1.addQueue(policyWorkerQueue);
+        streamRouterSpec1.setStreamId("streamRouterSpec1");
+
+        Assert.assertFalse(streamRouterSpec.equals(streamRouterSpec1));
+
+        streamRouterSpec1.setStreamId("streamRouterSpec");
+
+        Assert.assertTrue(streamRouterSpec.equals(streamRouterSpec1));
+        Assert.assertTrue(streamRouterSpec.hashCode() == streamRouterSpec1.hashCode());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java
new file mode 100644
index 0000000..8bbfc41
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class Tuple2StreamMetadataTest {
+    @Test
+    public void testTuple2StreamMetadata() {
+        Tuple2StreamMetadata metadata = new Tuple2StreamMetadata();
+        Set activeStreamNames = new HashSet<>();
+        activeStreamNames.add("defaultStringStream");
+        metadata.setStreamNameSelectorCls("org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector");
+        metadata.setStreamNameSelectorProp(new Properties());
+        metadata.getStreamNameSelectorProp().put("userProvidedStreamName", "defaultStringStream");
+        metadata.setActiveStreamNames(activeStreamNames);
+        metadata.setTimestampColumn("timestamp");
+
+        Tuple2StreamMetadata metadata1 = new Tuple2StreamMetadata();
+        Set activeStreamNames1 = new HashSet<>();
+        activeStreamNames1.add("defaultStringStream");
+        metadata1.setStreamNameSelectorCls("org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector");
+        metadata1.setStreamNameSelectorProp(new Properties());
+        metadata1.getStreamNameSelectorProp().put("userProvidedStreamName", "defaultStringStream");
+        metadata1.setActiveStreamNames(activeStreamNames1);
+        metadata1.setTimestampColumn("timestamp");
+
+        Assert.assertFalse(metadata == metadata1);
+        Assert.assertFalse(metadata.equals(metadata1));
+        Assert.assertFalse(metadata.hashCode() == metadata1.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java
new file mode 100644
index 0000000..48ee73b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class WorkSlotTest {
+    @Test
+    public void testWorkSlot() {
+        WorkSlot workSlot = new WorkSlot();
+        Assert.assertEquals("(null:null)", workSlot.toString());
+        Assert.assertEquals(null, workSlot.getBoltId());
+        Assert.assertEquals(null, workSlot.getTopologyName());
+        workSlot.setBoltId("setBoltId");
+        workSlot.setTopologyName("setTopologyName");
+        Assert.assertEquals("(setTopologyName:setBoltId)", workSlot.toString());
+        Assert.assertEquals("setBoltId", workSlot.getBoltId());
+        Assert.assertEquals("setTopologyName", workSlot.getTopologyName());
+
+        WorkSlot workSlot1 = new WorkSlot("setTopologyName", "setBoltId");
+        Assert.assertEquals("(setTopologyName:setBoltId)", workSlot1.toString());
+        Assert.assertEquals("setBoltId", workSlot1.getBoltId());
+        Assert.assertEquals("setTopologyName", workSlot1.getTopologyName());
+        Assert.assertTrue(workSlot1.equals(workSlot));
+        Assert.assertTrue(workSlot1.hashCode() == workSlot.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java
new file mode 100644
index 0000000..a2c0d6e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class MonitoredStreamTest {
+
+    @Test
+    public void testMonitoredStream() {
+
+        StreamGroup streamGroup = new StreamGroup();
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+        streamSortSpec.setWindowPeriod("PT10S");
+        StreamPartition streamPartition = new StreamPartition();
+        List<String> columns = new ArrayList<>();
+        columns.add("jobId");
+        streamPartition.setColumns(columns);
+        streamPartition.setSortSpec(streamSortSpec);
+        streamPartition.setStreamId("test");
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+        streamGroup.addStreamPartition(streamPartition);
+        WorkSlot workSlot = new WorkSlot("setTopologyName", "setBoltId");
+        List<WorkSlot> workSlots = new ArrayList<>();
+        workSlots.add(workSlot);
+        StreamWorkSlotQueue streamWorkSlotQueue = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), workSlots);
+
+        MonitoredStream monitoredStream = new MonitoredStream(streamGroup);
+        Assert.assertEquals(null, monitoredStream.getVersion());
+        Assert.assertTrue(monitoredStream.getQueues().isEmpty());
+        Assert.assertEquals(streamGroup, monitoredStream.getStreamGroup());
+        monitoredStream.addQueues(streamWorkSlotQueue);
+        Assert.assertEquals(streamWorkSlotQueue, monitoredStream.getQueues().get(0));
+
+        MonitoredStream monitoredStream1 = new MonitoredStream(streamGroup);
+        Assert.assertTrue(monitoredStream.equals(monitoredStream1));
+        Assert.assertTrue(monitoredStream.hashCode() == monitoredStream1.hashCode());
+
+        monitoredStream.removeQueue(streamWorkSlotQueue);
+        Assert.assertTrue(monitoredStream.getQueues().isEmpty());
+
+        Assert.assertTrue(monitoredStream.equals(monitoredStream1));
+        Assert.assertTrue(monitoredStream.hashCode() == monitoredStream1.hashCode());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java
new file mode 100644
index 0000000..1491c77
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PolicyAssignmentTest {
+    @Test
+    public void testPolicyAssignment() {
+        PolicyAssignment policyAssignment = new PolicyAssignment("policy", "queue");
+        Assert.assertEquals("policy", policyAssignment.getPolicyName());
+        Assert.assertEquals("queue", policyAssignment.getQueueId());
+        Assert.assertEquals(null, policyAssignment.getVersion());
+        Assert.assertEquals("PolicyAssignment of policy policy, queueId queue, version null !", policyAssignment.toString());
+
+        Assert.assertFalse(policyAssignment.equals(new PolicyAssignment("policy", "queue")));
+        Assert.assertFalse(policyAssignment == new PolicyAssignment("policy", "queue"));
+        Assert.assertFalse(policyAssignment.hashCode() == new PolicyAssignment("policy", "queue").hashCode());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
new file mode 100644
index 0000000..d0f0189
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamGroupTest {
+    @Test
+    public void testStreamGroup() {
+        StreamGroup streamGroup = new StreamGroup();
+        Assert.assertEquals("StreamGroup partitions=: [] ", streamGroup.toString());
+        Assert.assertEquals("SG[]", streamGroup.getStreamId());
+
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+        streamSortSpec.setWindowPeriod("PT10S");
+        StreamPartition streamPartition = new StreamPartition();
+        List<String> columns = new ArrayList<>();
+        columns.add("jobId");
+        streamPartition.setColumns(columns);
+        streamPartition.setSortSpec(streamSortSpec);
+        streamPartition.setStreamId("test");
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+        streamGroup.addStreamPartition(streamPartition);
+        Assert.assertEquals("SG[test-]", streamGroup.getStreamId());
+        Assert.assertEquals("StreamGroup partitions=: [StreamPartition[streamId=test,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]]] ", streamGroup.toString());
+
+        List<StreamPartition> streamPartitions = new ArrayList<>();
+        streamPartition.setStreamId("test1");
+        streamPartitions.add(streamPartition);
+        streamGroup.addStreamPartitions(streamPartitions);
+        Assert.assertEquals("SG[test1-test1-]", streamGroup.getStreamId());
+
+
+        streamPartitions = new ArrayList<>();
+        StreamPartition streamPartition1 = new StreamPartition();
+        streamPartition1.setStreamId("test2");
+        streamPartitions.add(streamPartition1);
+        streamGroup.addStreamPartitions(streamPartitions);
+        Assert.assertEquals("SG[test1-test1-test2-]", streamGroup.getStreamId());
+        Assert.assertEquals("StreamGroup partitions=: [StreamPartition[streamId=test1,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]], StreamPartition[streamId=test1,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]], StreamPartition[streamId=test2,type=null,columns=[],sortSpec=[null]]] ", streamGroup.toString());
+
+        StreamGroup streamGroup1 = new StreamGroup();
+        streamGroup1.addStreamPartitions(streamGroup.getStreamPartitions());
+        Assert.assertTrue(streamGroup.equals(streamGroup1));
+        Assert.assertTrue(streamGroup.hashCode() == streamGroup1.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java
new file mode 100644
index 0000000..bc2f74e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class StreamWorkSlotQueueTest {
+    @Test
+    public void testStreamWorkSlotQueue() {
+        StreamGroup streamGroup = new StreamGroup();
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+        streamSortSpec.setWindowPeriod("PT10S");
+        StreamPartition streamPartition = new StreamPartition();
+        List<String> columns = new ArrayList<>();
+        columns.add("jobId");
+        streamPartition.setColumns(columns);
+        streamPartition.setSortSpec(streamSortSpec);
+        streamPartition.setStreamId("test");
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+        streamGroup.addStreamPartition(streamPartition);
+        WorkSlot workSlot = new WorkSlot("setTopologyName", "setBoltId");
+        List<WorkSlot> workSlots = new ArrayList<>();
+        workSlots.add(workSlot);
+        StreamWorkSlotQueue streamWorkSlotQueue = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), workSlots);
+
+        Assert.assertTrue(streamWorkSlotQueue.getQueueId().startsWith("SG[test-]"));
+        Assert.assertTrue(streamWorkSlotQueue.getDedicateOption().isEmpty());
+        Assert.assertEquals(0, streamWorkSlotQueue.getNumberOfGroupBolts());
+        Assert.assertEquals(1, streamWorkSlotQueue.getQueueSize());
+        Assert.assertTrue(streamWorkSlotQueue.getTopoGroupStartIndex().isEmpty());
+        Assert.assertEquals(-1, streamWorkSlotQueue.getTopologyGroupStartIndex(""));
+        Assert.assertEquals(workSlot, streamWorkSlotQueue.getWorkingSlots().get(0));
+
+        StreamWorkSlotQueue streamWorkSlotQueue1 = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), workSlots);
+        Assert.assertFalse(streamWorkSlotQueue.equals(streamWorkSlotQueue1));
+        Assert.assertFalse(streamWorkSlotQueue == streamWorkSlotQueue1);
+        Assert.assertFalse(streamWorkSlotQueue.hashCode() == streamWorkSlotQueue1.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java
new file mode 100644
index 0000000..760657a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TopologyTest {
+    @Test
+    public void testTopology() {
+        Topology topology = new Topology("test", 2, 3);
+        Assert.assertEquals(null, topology.getClusterName());
+        Assert.assertEquals("test", topology.getName());
+        Assert.assertEquals(null, topology.getPubBoltId());
+        Assert.assertEquals(null, topology.getSpoutId());
+        Assert.assertEquals(0, topology.getAlertBoltIds().size());
+        Assert.assertEquals(1, topology.getAlertParallelism());
+        Assert.assertEquals(0, topology.getGroupNodeIds().size());
+        Assert.assertEquals(1, topology.getGroupParallelism());
+        Assert.assertEquals(3, topology.getNumOfAlertBolt());
+        Assert.assertEquals(2, topology.getNumOfGroupBolt());
+        Assert.assertEquals(0, topology.getNumOfPublishBolt());
+        Assert.assertEquals(1, topology.getNumOfSpout());
+        Assert.assertEquals(1, topology.getSpoutParallelism());
+
+        Topology topology1 = new Topology("test", 2, 3);
+
+        Assert.assertFalse(topology1.equals(topology));
+        Assert.assertFalse(topology1.hashCode() == topology.hashCode());
+        Assert.assertFalse(topology1 == topology);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java
new file mode 100644
index 0000000..cc84c56
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class OverrideDeduplicatorSpecTest {
+
+    @Test
+    public void testOverrideDeduplicatorSpec() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("kafka_broker", "localhost:9092");
+        properties.put("topic", "TEST_TOPIC_NAME");
+        OverrideDeduplicatorSpec overrideDeduplicatorSpec = new OverrideDeduplicatorSpec();
+        overrideDeduplicatorSpec.setClassName("testClass");
+        overrideDeduplicatorSpec.setProperties(properties);
+
+        OverrideDeduplicatorSpec overrideDeduplicatorSpec1 = new OverrideDeduplicatorSpec();
+        overrideDeduplicatorSpec1.setClassName("testClass");
+        overrideDeduplicatorSpec1.setProperties(properties);
+
+        Assert.assertFalse(overrideDeduplicatorSpec1 == overrideDeduplicatorSpec);
+        Assert.assertTrue(overrideDeduplicatorSpec1.equals(overrideDeduplicatorSpec));
+        Assert.assertTrue(overrideDeduplicatorSpec1.hashCode() == overrideDeduplicatorSpec.hashCode());
+
+        overrideDeduplicatorSpec1.setClassName("testClass1");
+
+        Assert.assertFalse(overrideDeduplicatorSpec1 == overrideDeduplicatorSpec);
+        Assert.assertFalse(overrideDeduplicatorSpec1.equals(overrideDeduplicatorSpec));
+        Assert.assertFalse(overrideDeduplicatorSpec1.hashCode() == overrideDeduplicatorSpec.hashCode());
+
+        overrideDeduplicatorSpec1.setClassName("testClass");
+        Map<String, String> properties1 = new HashMap<>();
+        properties.put("kafka_broker", "localhost:9092");
+        overrideDeduplicatorSpec1.setProperties(properties1);
+
+        Assert.assertFalse(overrideDeduplicatorSpec1 == overrideDeduplicatorSpec);
+        Assert.assertFalse(overrideDeduplicatorSpec1.equals(overrideDeduplicatorSpec));
+        Assert.assertFalse(overrideDeduplicatorSpec1.hashCode() == overrideDeduplicatorSpec.hashCode());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
new file mode 100644
index 0000000..7acb4f7
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+public class PolicyDefinitionTest {
+
+    @Test
+    public void testPolicyInnerDefinition() {
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        def.setValue("test");
+        def.setType("siddhi");
+        def.setHandlerClass("setHandlerClass");
+        def.setProperties(new HashMap<>());
+        def.setOutputStreams(Arrays.asList("outputStream"));
+        def.setInputStreams(Arrays.asList("inputStream"));
+        Assert.assertEquals("{type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }", def.toString());
+
+        PolicyDefinition.Definition def1 = new PolicyDefinition.Definition();
+        def1.setValue("test");
+        def1.setType("siddhi");
+        def1.setHandlerClass("setHandlerClass");
+        def1.setProperties(new HashMap<>());
+        def1.setOutputStreams(Arrays.asList("outputStream"));
+        def1.setInputStreams(Arrays.asList("inputStream"));
+
+        Assert.assertFalse(def == def1);
+        Assert.assertTrue(def.equals(def1));
+        Assert.assertTrue(def.hashCode() == def1.hashCode());
+
+        def1.setInputStreams(Arrays.asList("inputStream1"));
+
+        Assert.assertFalse(def.equals(def1));
+        Assert.assertTrue(def.hashCode() == def1.hashCode());//problem  equals() and hashCode() be inconsistent
+
+    }
+
+    @Test
+    public void testPolicyDefinition() {
+        PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        def.setValue("test");
+        def.setType("siddhi");
+        def.setHandlerClass("setHandlerClass");
+        def.setProperties(new HashMap<>());
+        def.setOutputStreams(Arrays.asList("outputStream"));
+        def.setInputStreams(Arrays.asList("inputStream"));
+        pd.setDefinition(def);
+        pd.setInputStreams(Arrays.asList("inputStream"));//confuse with PolicyDefinition.Definition InputStreams
+        pd.setOutputStreams(Arrays.asList("outputStream"));//confuse with PolicyDefinition.Definition OutputStreams
+        pd.setName("policyName");
+        pd.setDescription(String.format("Test policy for stream %s", "streamName"));
+
+        StreamPartition sp = new StreamPartition();
+        sp.setStreamId("streamName");
+        sp.setColumns(Arrays.asList("host"));
+        sp.setType(StreamPartition.Type.GROUPBY);
+        pd.addPartition(sp);
+        Assert.assertEquals("{name=\"policyName\",definition={type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }}", pd.toString());
+
+        PolicyDefinition pd1 = new PolicyDefinition();
+        PolicyDefinition.Definition def1 = new PolicyDefinition.Definition();
+        def1.setValue("test");
+        def1.setType("siddhi");
+        def1.setHandlerClass("setHandlerClass");
+        def1.setProperties(new HashMap<>());
+        def1.setOutputStreams(Arrays.asList("outputStream"));
+        def1.setInputStreams(Arrays.asList("inputStream"));
+        pd1.setDefinition(def1);
+        pd1.setInputStreams(Arrays.asList("inputStream"));//confuse with PolicyDefinition.Definition InputStreams
+        pd1.setOutputStreams(Arrays.asList("outputStream"));//confuse with PolicyDefinition.Definition OutputStreams
+        pd1.setName("policyName");
+        pd1.setDescription(String.format("Test policy for stream %s", "streamName"));
+
+        StreamPartition sp1 = new StreamPartition();
+        sp1.setStreamId("streamName");
+        sp1.setColumns(Arrays.asList("host"));
+        sp1.setType(StreamPartition.Type.GROUPBY);
+        pd1.addPartition(sp1);
+
+
+        Assert.assertFalse(pd == pd1);
+        Assert.assertTrue(pd.equals(pd1));
+        Assert.assertTrue(pd.hashCode() == pd1.hashCode());
+        sp1.setStreamId("streamName1");
+
+        Assert.assertFalse(pd == pd1);
+        Assert.assertFalse(pd.equals(pd1));
+        Assert.assertFalse(pd.hashCode() == pd1.hashCode());
+
+        sp1.setStreamId("streamName");
+        def1.setOutputStreams(Arrays.asList("outputStream1"));
+
+        Assert.assertFalse(pd == pd1);
+        Assert.assertFalse(pd.equals(pd1));
+
+        Assert.assertTrue(pd.hashCode() == pd1.hashCode());//problem  equals() and hashCode() be inconsistent
+
+    }
+
+    @Test
+    public void testPolicyDefinitionEqualByPolicyStatus() {
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        PolicyDefinition policy1 = new PolicyDefinition();
+        policy1.setName("policy1");
+        policy1.setDefinition(definition);
+
+        PolicyDefinition policy2 = new PolicyDefinition();
+        policy2.setName("policy1");
+        policy2.setPolicyStatus(PolicyDefinition.PolicyStatus.DISABLED);
+        policy2.setDefinition(definition);
+
+        PolicyDefinition policy3 = new PolicyDefinition();
+        policy3.setName("policy1");
+        policy3.setDefinition(definition);
+
+        Assert.assertTrue(policy1.equals(policy3));
+        Assert.assertFalse(policy1.equals(policy2));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java
new file mode 100644
index 0000000..494d8ca
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class PublishmentTest {
+    @Test
+    public void testPublishment() {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put("kafka_broker", "localhost:9092");
+        properties.put("topic", "TEST_TOPIC_NAME");
+
+        List<Map<String, Object>> kafkaClientConfig = new ArrayList<>();
+        kafkaClientConfig.add(ImmutableMap.of("name", "producer.type", "value", "sync"));
+        properties.put("kafka_client_config", kafkaClientConfig);
+
+        PolicyDefinition policy = createPolicy("testStream", "testPolicy");
+        Publishment publishment = new Publishment();
+        publishment.setName("testAsyncPublishment");
+        publishment.setType("org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher");
+        publishment.setPolicyIds(Arrays.asList(policy.getName()));
+        publishment.setDedupIntervalMin("PT0M");
+        OverrideDeduplicatorSpec overrideDeduplicatorSpec = new OverrideDeduplicatorSpec();
+        overrideDeduplicatorSpec.setClassName("testClass");
+        publishment.setOverrideDeduplicator(overrideDeduplicatorSpec);
+        publishment.setSerializer("org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer");
+        publishment.setProperties(properties);
+
+        Assert.assertEquals("Publishment[name:testAsyncPublishment,type:org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher,policyId:[testPolicy],properties:{kafka_client_config=[{name=producer.type, value=sync}], topic=TEST_TOPIC_NAME, kafka_broker=localhost:9092}", publishment.toString());
+
+
+        Publishment publishment1 = new Publishment();
+        publishment1.setName("testAsyncPublishment");
+        publishment1.setType("org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher");
+        publishment1.setPolicyIds(Arrays.asList(policy.getName()));
+        publishment1.setDedupIntervalMin("PT0M");
+        OverrideDeduplicatorSpec overrideDeduplicatorSpec1 = new OverrideDeduplicatorSpec();
+        overrideDeduplicatorSpec1.setClassName("testClass");
+        publishment1.setOverrideDeduplicator(overrideDeduplicatorSpec1);
+        publishment1.setSerializer("org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer");
+        publishment1.setProperties(properties);
+
+        Assert.assertTrue(publishment.equals(publishment1));
+        Assert.assertTrue(publishment.hashCode() == publishment1.hashCode());
+        Assert.assertFalse(publishment == publishment1);
+        publishment1.getOverrideDeduplicator().setClassName("testClass1");
+
+
+        Assert.assertFalse(publishment.equals(publishment1));
+        Assert.assertFalse(publishment.hashCode() == publishment1.hashCode());
+        Assert.assertFalse(publishment == publishment1);
+
+        publishment1.getOverrideDeduplicator().setClassName("testClass");
+        publishment1.setStreamIds(Arrays.asList("streamid1,streamid2"));
+        Assert.assertFalse(publishment.equals(publishment1));
+        Assert.assertFalse(publishment.hashCode() == publishment1.hashCode());
+        Assert.assertFalse(publishment == publishment1);
+    }
+
+    private PolicyDefinition createPolicy(String streamName, String policyName) {
+        PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        // expression, something like "PT5S,dynamic,1,host"
+        def.setValue("test");
+        def.setType("siddhi");
+        pd.setDefinition(def);
+        pd.setInputStreams(Arrays.asList("inputStream"));
+        pd.setOutputStreams(Arrays.asList("outputStream"));
+        pd.setName(policyName);
+        pd.setDescription(String.format("Test policy for stream %s", streamName));
+
+        StreamPartition sp = new StreamPartition();
+        sp.setStreamId(streamName);
+        sp.setColumns(Arrays.asList("host"));
+        sp.setType(StreamPartition.Type.GROUPBY);
+        pd.addPartition(sp);
+        return pd;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
new file mode 100644
index 0000000..91f9cf8
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PublishmentTypeTest {
+    @Test
+    public void testPublishmentType() {
+        PublishmentType publishmentType = new PublishmentType();
+        publishmentType.setType("KAFKA");
+        publishmentType.setClassName("setClassName");
+        publishmentType.setDescription("setDescription");
+        publishmentType.setFields("setFields");
+
+        PublishmentType publishmentType1 = new PublishmentType();
+        publishmentType1.setType("KAFKA");
+        publishmentType1.setClassName("setClassName");
+        publishmentType1.setDescription("setDescription");
+        publishmentType1.setFields("setFields");
+
+        Assert.assertFalse(publishmentType.equals(new String("")));
+        Assert.assertFalse(publishmentType == publishmentType1);
+        Assert.assertTrue(publishmentType.equals(publishmentType1));
+        Assert.assertTrue(publishmentType.hashCode() == publishmentType1.hashCode());
+
+        publishmentType1.setType("JMS");
+
+        Assert.assertFalse(publishmentType.equals(publishmentType1));
+        Assert.assertFalse(publishmentType.hashCode() == publishmentType1.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java
new file mode 100644
index 0000000..ccc7717
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+
+
+public class StreamColumnTest {
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void testStreamStringColumn() {
+        StreamColumn streamColumn = new StreamColumn.Builder().name("NAMEyhd").type(StreamColumn.Type.STRING).defaultValue("EAGLEyhd").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,NAMEyhd");
+        Assert.assertEquals("StreamColumn=name[NAMEyhd], type=[string], defaultValue=[EAGLEyhd], required=[true], nodataExpression=[PT1M,dynamic,1,NAMEyhd]", streamColumn.toString());
+        Assert.assertTrue(streamColumn.getDefaultValue() instanceof String);
+    }
+
+    @Test
+    public void testStreamLongColumn() {
+        thrown.expect(NumberFormatException.class);
+        new StreamColumn.Builder().name("salary").type(StreamColumn.Type.LONG).defaultValue("eagle").required(true).build();
+    }
+
+    @Test
+    public void testStreamLongColumn1() {
+        StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.LONG).defaultValue("0").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[long], defaultValue=[0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+        Assert.assertTrue(streamColumn.getDefaultValue() instanceof Long);
+    }
+
+    @Test
+    public void testStreamDoubleColumn() {
+        thrown.expect(NumberFormatException.class);
+        new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("eagle").required(true).build();
+    }
+
+    @Test
+    public void testStreamDoubleColumn1() {
+        StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("0.1").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[double], defaultValue=[0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+        streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("-0.1").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[double], defaultValue=[-0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+        streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("1").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[double], defaultValue=[1.0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+        Assert.assertTrue(streamColumn.getDefaultValue() instanceof Double);
+    }
+
+    @Test
+    public void testStreamFloatColumn() {
+        thrown.expect(NumberFormatException.class);
+        new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("eagle").required(true).build();
+    }
+
+    @Test
+    public void testStreamFloatColumn1() {
+        StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("0.1").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[float], defaultValue=[0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+        streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("-0.1").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[float], defaultValue=[-0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+        streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("1").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[float], defaultValue=[1.0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+        Assert.assertTrue(streamColumn.getDefaultValue() instanceof Float);
+    }
+
+    @Test
+    public void testStreamIntColumn() {
+        thrown.expect(NumberFormatException.class);
+        new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("eagle").required(true).build();
+    }
+
+    @Test
+    public void testStreamIntColumn1() {
+        thrown.expect(NumberFormatException.class);
+        new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("0.1").required(true).build();
+    }
+
+
+    @Test
+    public void testStreamIntColumn2() {
+        StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("1").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[int], defaultValue=[1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+        streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("0").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[int], defaultValue=[0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+        Assert.assertTrue(streamColumn.getDefaultValue() instanceof Integer);
+    }
+
+    @Test
+    public void testStreamBoolColumn() {
+        StreamColumn streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("eagle").required(false).build();
+        streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+        Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[false], required=[false], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+        streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("1").required(true).build();
+        streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+        Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[false], required=[true], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+        streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("0").required(true).build();
+        streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+        Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[false], required=[true], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+        streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("True").required(true).build();
+        streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+        Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[true], required=[true], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+        Assert.assertTrue(streamBoolColumn.getDefaultValue() instanceof Boolean);
+    }
+
+    @Test
+    public void testStreamObjectColumn() {
+        thrown.expect(IllegalArgumentException.class);
+        new StreamColumn.Builder().name("name").type(StreamColumn.Type.OBJECT).defaultValue("eagle").required(true).build();
+    }
+
+    @Test
+    public void testStreamObjectColumn1() {
+        StreamColumn streamColumn = new StreamColumn.Builder().name("name").type(StreamColumn.Type.OBJECT).defaultValue("{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,name");
+        Assert.assertEquals("StreamColumn=name[name], type=[object], defaultValue=[{name=heap.COMMITTED, Value=175636480}], required=[true], nodataExpression=[PT1M,dynamic,1,name]", streamColumn.toString());
+        Assert.assertTrue(streamColumn.getDefaultValue() instanceof HashMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
new file mode 100644
index 0000000..b5015cd
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamDefinitionTest {
+    @Test
+    public void testStreamDefinition() {
+
+        List<StreamColumn> streamColumns = new ArrayList<>();
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+
+        StreamDefinition streamDefinition = new StreamDefinition();
+        Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[]", streamDefinition.toString());
+        streamDefinition.setColumns(streamColumns);
+
+        Assert.assertEquals(3, streamDefinition.getColumnIndex("data"));
+        Assert.assertEquals(-1, streamDefinition.getColumnIndex("DATA"));
+        Assert.assertEquals(-1, streamDefinition.getColumnIndex("isYhd"));
+        Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition.toString());
+        StreamDefinition streamDefinition1 = streamDefinition.copy();
+        Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition1.toString());
+
+        Assert.assertFalse(streamDefinition1.equals(streamDefinition));
+        Assert.assertFalse(streamDefinition1 == streamDefinition);
+        Assert.assertFalse(streamDefinition1.hashCode() == streamDefinition.hashCode());
+    }
+}