You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/11/11 03:17:42 UTC

incubator-eagle git commit: [EAGLE-750] Improve coordinator schedule strategy to reuse alert work slot

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 0a0d1f6ee -> 75ab7722c


[EAGLE-750] Improve coordinator schedule strategy to reuse alert work slot

1. Add switch coordinator.reuseBoltInStreams to enable/disable reuse bolt in multiple streams
2. Add config coordinator.streamsPerBolt to set maximum streams in one bolt
3. Enable dedicated bolt for specific stream

Author: Li, Garrett
Reviewer: ralphsu

This closes #638


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/75ab7722
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/75ab7722
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/75ab7722

Branch: refs/heads/master
Commit: 75ab7722cd9e87e2380d9fac4fd6922513cc23f3
Parents: 0a0d1f6
Author: Xiancheng Li <xi...@ebay.com>
Authored: Wed Nov 9 15:17:15 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Fri Nov 11 11:19:24 2016 +0800

----------------------------------------------------------------------
 .../model/internal/StreamGroup.java             |  25 +++-
 .../engine/coordinator/PolicyDefinition.java    |  28 ++--
 .../model/internal/StreamGroupTest.java         |   7 +-
 .../alert/coordinator/CoordinatorConstants.java |   2 +
 .../coordinator/impl/GreedyPolicyScheduler.java |   4 +-
 .../strategies/SameTopologySlotStrategy.java    |  27 +++-
 .../alert/coordinator/WorkSlotStrategyTest.java | 146 ++++++++++++++++++-
 .../resources/application-multiplestreams.conf  |  53 +++++++
 .../resources/application-multiplestreams2.conf |  52 +++++++
 .../eagle/common/TestSerializableUtils.java     |   2 +
 10 files changed, 320 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75ab7722/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
index 7941b85..9ceb7c8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
@@ -17,17 +17,17 @@
 package org.apache.eagle.alert.coordination.model.internal;
 
 
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.base.Objects;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class StreamGroup {
 
+    private boolean dedicated; 
     private List<StreamPartition> streamPartitions = new ArrayList<StreamPartition>();
 
     public StreamGroup() {
@@ -41,10 +41,25 @@ public class StreamGroup {
         this.streamPartitions.add(sp);
     }
 
+    public void addStreamPartition(StreamPartition sp, boolean dedicated) {
+        this.dedicated = dedicated;
+        this.streamPartitions.add(sp);
+    }
+
     public void addStreamPartitions(List<StreamPartition> sps) {
         this.streamPartitions.addAll(sps);
     }
 
+    public void addStreamPartitions(List<StreamPartition> sps, boolean dedicated) {
+        this.dedicated = dedicated;
+        this.streamPartitions.addAll(sps);
+    }
+
+    @JsonIgnore
+    public boolean isDedicated() {
+        return dedicated;
+    }
+
     @JsonIgnore
     public String getStreamId() {
         StringBuilder sb = new StringBuilder("SG[");
@@ -58,7 +73,7 @@ public class StreamGroup {
     @Override
     public int hashCode() {
         // implicitly all groups in stream groups will be built for hash code
-        return new HashCodeBuilder().append(streamPartitions).build();
+        return new HashCodeBuilder().append(streamPartitions).append(dedicated).build();
     }
 
     @Override
@@ -67,12 +82,12 @@ public class StreamGroup {
             return false;
         }
         StreamGroup o = (StreamGroup) obj;
-        return Objects.equal(this.streamPartitions, o.streamPartitions);
+        return Objects.equal(this.streamPartitions, o.streamPartitions) && Objects.equal(this.dedicated, o.dedicated);
     }
 
     @Override
     public String toString() {
-        return String.format("StreamGroup partitions=: %s ", streamPartitions);
+        return String.format("StreamGroup dedicated=: %s partitions=: %s ", dedicated, streamPartitions);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75ab7722/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index c131e12..94d84f2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -42,6 +42,7 @@ public class PolicyDefinition implements Serializable {
 
     // one stream only have one partition in one policy, since we don't support stream alias
     private List<StreamPartition> partitionSpec = new ArrayList<StreamPartition>();
+    private boolean dedicated;
 
     // runtime configuration for policy, these are user-invisible
     private int parallelismHint = 1;
@@ -106,6 +107,14 @@ public class PolicyDefinition implements Serializable {
         this.partitionSpec.add(par);
     }
 
+    public boolean isDedicated() {
+        return dedicated;
+    }
+
+    public void setDedicated(boolean dedicated) {
+        this.dedicated = dedicated;
+    }
+
     public int getParallelismHint() {
         return parallelismHint;
     }
@@ -125,13 +134,13 @@ public class PolicyDefinition implements Serializable {
     @Override
     public int hashCode() {
         return new HashCodeBuilder()
-                .append(name)
-                .append(inputStreams)
-                .append(outputStreams)
-                .append(definition)
-                .append(partitionSpec)
-                .append(policyStatus)
-                .append(parallelismHint)
+            .append(name)
+            .append(inputStreams)
+            .append(outputStreams)
+            .append(definition)
+            .append(partitionSpec)
+            .append(policyStatus)
+            .append(parallelismHint)
             .build();
     }
 
@@ -154,9 +163,8 @@ public class PolicyDefinition implements Serializable {
             && (another.definition != null && another.definition.equals(this.definition))
             && Objects.equals(this.definition, another.definition)
             && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec)
-                && another.policyStatus.equals(this.policyStatus)
-                && another.parallelismHint == this.parallelismHint
-            ) {
+            && another.policyStatus.equals(this.policyStatus)
+            && another.parallelismHint == this.parallelismHint) {
             return true;
         }
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75ab7722/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
index d0f0189..467100c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
@@ -28,7 +28,7 @@ public class StreamGroupTest {
     @Test
     public void testStreamGroup() {
         StreamGroup streamGroup = new StreamGroup();
-        Assert.assertEquals("StreamGroup partitions=: [] ", streamGroup.toString());
+        Assert.assertEquals("StreamGroup dedicated=: false partitions=: [] ", streamGroup.toString());
         Assert.assertEquals("SG[]", streamGroup.getStreamId());
 
         StreamSortSpec streamSortSpec = new StreamSortSpec();
@@ -42,7 +42,8 @@ public class StreamGroupTest {
         streamPartition.setType(StreamPartition.Type.GROUPBY);
         streamGroup.addStreamPartition(streamPartition);
         Assert.assertEquals("SG[test-]", streamGroup.getStreamId());
-        Assert.assertEquals("StreamGroup partitions=: [StreamPartition[streamId=test,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]]] ", streamGroup.toString());
+        Assert.assertEquals("StreamGroup dedicated=: false partitions=: [StreamPartition[streamId=test,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]]] ",
+            streamGroup.toString());
 
         List<StreamPartition> streamPartitions = new ArrayList<>();
         streamPartition.setStreamId("test1");
@@ -57,7 +58,7 @@ public class StreamGroupTest {
         streamPartitions.add(streamPartition1);
         streamGroup.addStreamPartitions(streamPartitions);
         Assert.assertEquals("SG[test1-test1-test2-]", streamGroup.getStreamId());
-        Assert.assertEquals("StreamGroup partitions=: [StreamPartition[streamId=test1,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]], StreamPartition[streamId=test1,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]], StreamPartition[streamId=test2,type=null,columns=[],sortSpec=[null]]] ", streamGroup.toString());
+        Assert.assertEquals("StreamGroup dedicated=: false partitions=: [StreamPartition[streamId=test1,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]], StreamPartition[streamId=test1,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]], StreamPartition[streamId=test2,type=null,columns=[],sortSpec=[null]]] ", streamGroup.toString());
 
         StreamGroup streamGroup1 = new StreamGroup();
         streamGroup1.addStreamPartitions(streamGroup.getStreamPartitions());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75ab7722/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
index 0a09de2..c026785 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
@@ -24,4 +24,6 @@ public class CoordinatorConstants {
     public static final String BOLT_PARALLELISM = "boltParallelism";
     public static final String NUM_OF_ALERT_BOLTS_PER_TOPOLOGY = "numOfAlertBoltsPerTopology";
     public static final String POLICIES_PER_BOLT = "policiesPerBolt";
+    public static final String REUSE_BOLT_IN_STREAMS = "reuseBoltInStreams";
+    public static final String STREAMS_PER_BOLT = "streamsPerBolt";
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75ab7722/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
index 49a16ff..89f7cdb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
@@ -220,7 +220,7 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
             result.message = "policy doesn't have partition spec";
             return result;
         }
-        policyStreamPartition.addStreamPartitions(item.def.getPartitionSpec());
+        policyStreamPartition.addStreamPartitions(item.def.getPartitionSpec(), item.def.isDedicated());
 
         MonitoredStream targetdStream = context.getMonitoredStreams().get(policyStreamPartition);
         if (targetdStream == null) {
@@ -271,7 +271,7 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
         if (targetQueue == null) {
             WorkQueueBuilder builder = new WorkQueueBuilder(context, mgmtService);
             // TODO : get the properties from policy definiton
-            targetQueue = builder.createQueue(targetdStream, false, getQueueSize(def.getParallelismHint()),
+            targetQueue = builder.createQueue(targetdStream, def.isDedicated(), getQueueSize(def.getParallelismHint()),
                 new HashMap<String, Object>());
         }
         return targetQueue;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75ab7722/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
index 823a548..0e5fd00 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
@@ -17,6 +17,7 @@
 package org.apache.eagle.alert.coordinator.impl.strategies;
 
 import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND;
+
 import org.apache.eagle.alert.coordination.model.WorkSlot;
 import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
 import org.apache.eagle.alert.coordination.model.internal.Topology;
@@ -36,11 +37,9 @@ import java.util.*;
 /**
  * A simple strategy that only find the bolts in the same topology as the
  * required work slots.
- *
- * <p>Invariant:<br/>
+ * Invariant:<br/>
  * One slot queue only on the one topology.<br/>
- * One topology doesn't contains two same partition slot queues.</p>
- *
+ * One topology doesn't contains two same partition slot queues.
  * @since Apr 27, 2016
  */
 public class SameTopologySlotStrategy implements IWorkSlotStrategy {
@@ -53,6 +52,8 @@ public class SameTopologySlotStrategy implements IWorkSlotStrategy {
 
     //    private final int numOfPoliciesBoundPerBolt;
     private final double topoLoadUpbound;
+    private final boolean reuseBoltInStreams;
+    private final int streamsPerBolt;
 
     public SameTopologySlotStrategy(IScheduleContext context, StreamGroup streamPartitionGroup,
                                     TopologyMgmtService mgmtService) {
@@ -63,6 +64,16 @@ public class SameTopologySlotStrategy implements IWorkSlotStrategy {
         Config config = ConfigFactory.load().getConfig(CoordinatorConstants.CONFIG_ITEM_COORDINATOR);
         // numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT);
         topoLoadUpbound = config.getDouble(CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND);
+        if (config.hasPath(CoordinatorConstants.REUSE_BOLT_IN_STREAMS)) {
+            reuseBoltInStreams = config.getBoolean(CoordinatorConstants.REUSE_BOLT_IN_STREAMS);
+        } else {
+            reuseBoltInStreams = false;
+        }
+        if (config.hasPath(CoordinatorConstants.STREAMS_PER_BOLT)) {
+            streamsPerBolt = config.getInt(CoordinatorConstants.STREAMS_PER_BOLT);
+        } else {
+            streamsPerBolt = 1;
+        }
     }
 
     /**
@@ -145,7 +156,13 @@ public class SameTopologySlotStrategy implements IWorkSlotStrategy {
 
     private boolean isBoltAvailable(AlertBoltUsage alertUsage) {
         // FIXME : more detail to compare on queue exclusion check
-        if (alertUsage.getQueueSize() > 0) {
+        if (alertUsage.getPartitions().stream().filter(partition -> partition.isDedicated()).count() > 0) {
+            return false;
+        }
+        if (!reuseBoltInStreams && alertUsage.getQueueSize() > 0) {
+            return false;
+        }
+        if (reuseBoltInStreams && alertUsage.getQueueSize() >= streamsPerBolt) {
             return false;
         }
         // actually it's now 0;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75ab7722/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
index 077e619..56ee980 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.alert.coordinator;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.alert.coordinator.mock.TestTopologyMgmtService;
 import org.apache.eagle.alert.coordination.model.WorkSlot;
@@ -37,6 +40,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.ConfigFactory;
+
 /**
  * @since Apr 27, 2016
  */
@@ -104,7 +110,7 @@ public class WorkSlotStrategyTest {
         partition.setStreamId("s1");
         partition.setColumns(Arrays.asList("f1", "f2"));
         StreamGroup sg = new StreamGroup();
-        sg.addStreamPartition(partition);
+        sg.addStreamPartition(partition, false);
 
         MonitoredStream ms1 = new MonitoredStream(sg);
 
@@ -155,4 +161,142 @@ public class WorkSlotStrategyTest {
             Assert.assertNotEquals(topo1, topo2);
         }
     }
+    
+    @Test
+    public void testMultipleStreams() {
+    	ConfigFactory.invalidateCaches();
+        System.setProperty("config.resource", "/application-multiplestreams.conf");
+    	
+        InMemScheduleConext context = new InMemScheduleConext();
+
+        StreamGroup group1 = createStreamGroup("s1", Arrays.asList("f1", "f2"), true);
+        StreamGroup group2 = createStreamGroup("s2", Arrays.asList("f2", "f3"), false);
+        StreamGroup group3 = createStreamGroup("s3", Arrays.asList("f4"), false);
+        StreamGroup group4 = createStreamGroup("s4", Arrays.asList("f5"), false);
+
+        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(3, 4, "prefix-time1", true);
+        WorkQueueBuilder wrb = new WorkQueueBuilder(context, mgmtService);
+        {
+            StreamWorkSlotQueue queue = wrb.createQueue(new MonitoredStream(group1), group1.isDedicated(), 2, new HashMap<String, Object>());
+            print(context.getTopologyUsages().values());
+            
+            TopologyUsage usage = context.getTopologyUsages().values().iterator().next();
+            
+            Assert.assertTrue(getMonitorStream(usage.getMonitoredStream()).containsKey(group1));
+            Assert.assertEquals(1, getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().size());
+            Assert.assertEquals(2, getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().get(0).getWorkingSlots().size());
+            
+            List<String> group1Slots = new ArrayList<String>();
+            getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().get(0).getWorkingSlots().forEach(slot -> {
+            	group1Slots.add(slot.getBoltId());
+            });
+         
+            StreamWorkSlotQueue queue2 = wrb.createQueue(new MonitoredStream(group2), group2.isDedicated(), 2, new HashMap<String, Object>());
+            print(context.getTopologyUsages().values());
+            
+            Assert.assertTrue(getMonitorStream(usage.getMonitoredStream()).containsKey(group2));
+            Assert.assertEquals(1, getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().size());
+            Assert.assertEquals(2, getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().get(0).getWorkingSlots().size());
+            getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().get(0).getWorkingSlots().forEach(slot -> {
+            	Assert.assertTrue(!group1Slots.contains(slot.getBoltId()));
+            });
+            
+
+            StreamWorkSlotQueue queue3 = wrb.createQueue(new MonitoredStream(group3), group3.isDedicated(), 2, new HashMap<String, Object>());
+            print(context.getTopologyUsages().values());
+            
+            Assert.assertTrue(getMonitorStream(usage.getMonitoredStream()).containsKey(group3));
+            Assert.assertEquals(1, getMonitorStream(usage.getMonitoredStream()).get(group3).getQueues().size());
+            Assert.assertEquals(2, getMonitorStream(usage.getMonitoredStream()).get(group3).getQueues().get(0).getWorkingSlots().size());
+            getMonitorStream(usage.getMonitoredStream()).get(group3).getQueues().get(0).getWorkingSlots().forEach(slot -> {
+            	Assert.assertTrue(!group1Slots.contains(slot.getBoltId()));
+            });
+            
+            StreamWorkSlotQueue queue4 = wrb.createQueue(new MonitoredStream(group4), group4.isDedicated(), 2, new HashMap<String, Object>());
+            print(context.getTopologyUsages().values());
+            
+            Assert.assertTrue(!getMonitorStream(usage.getMonitoredStream()).containsKey(group4));
+            
+        }
+    }
+    
+    @Test
+    public void testMultipleStreamsWithoutReuse() {
+    	ConfigFactory.invalidateCaches();
+        System.setProperty("config.resource", "/application-multiplestreams2.conf");
+    	
+        InMemScheduleConext context = new InMemScheduleConext();
+
+        StreamGroup group1 = createStreamGroup("s1", Arrays.asList("f1", "f2"), true);
+        StreamGroup group2 = createStreamGroup("s2", Arrays.asList("f2", "f3"), false);
+        StreamGroup group3 = createStreamGroup("s3", Arrays.asList("f4"), false);
+        StreamGroup group4 = createStreamGroup("s4", Arrays.asList("f5"), false);
+
+        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(3, 4, "prefix-time1", true);
+        WorkQueueBuilder wrb = new WorkQueueBuilder(context, mgmtService);
+        {
+            StreamWorkSlotQueue queue = wrb.createQueue(new MonitoredStream(group1), group1.isDedicated(), 2, new HashMap<String, Object>());
+            print(context.getTopologyUsages().values());
+            
+            TopologyUsage usage = context.getTopologyUsages().values().iterator().next();
+            
+            Assert.assertTrue(getMonitorStream(usage.getMonitoredStream()).containsKey(group1));
+            Assert.assertEquals(1, getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().size());
+            Assert.assertEquals(2, getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().get(0).getWorkingSlots().size());
+            
+            List<String> group1Slots = new ArrayList<String>();
+            getMonitorStream(usage.getMonitoredStream()).get(group1).getQueues().get(0).getWorkingSlots().forEach(slot -> {
+            	group1Slots.add(slot.getBoltId());
+            });
+         
+            StreamWorkSlotQueue queue2 = wrb.createQueue(new MonitoredStream(group2), group2.isDedicated(), 2, new HashMap<String, Object>());
+            print(context.getTopologyUsages().values());
+            
+            Assert.assertTrue(getMonitorStream(usage.getMonitoredStream()).containsKey(group2));
+            Assert.assertEquals(1, getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().size());
+            Assert.assertEquals(2, getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().get(0).getWorkingSlots().size());
+            getMonitorStream(usage.getMonitoredStream()).get(group2).getQueues().get(0).getWorkingSlots().forEach(slot -> {
+            	Assert.assertTrue(!group1Slots.contains(slot.getBoltId()));
+            });
+            
+
+            StreamWorkSlotQueue queue3 = wrb.createQueue(new MonitoredStream(group3), group3.isDedicated(), 2, new HashMap<String, Object>());
+            print(context.getTopologyUsages().values());
+            
+            Assert.assertTrue(!getMonitorStream(usage.getMonitoredStream()).containsKey(group3));
+            
+            StreamWorkSlotQueue queue4 = wrb.createQueue(new MonitoredStream(group4), group4.isDedicated(), 2, new HashMap<String, Object>());
+            print(context.getTopologyUsages().values());
+            
+            Assert.assertTrue(!getMonitorStream(usage.getMonitoredStream()).containsKey(group4));
+            
+        }
+    }
+    
+    private Map<StreamGroup, MonitoredStream> getMonitorStream(List<MonitoredStream> monitorStreams) {
+    	Map<StreamGroup, MonitoredStream> result = new HashMap<StreamGroup, MonitoredStream>();
+    	monitorStreams.forEach(monitorStream -> {
+    		result.put(monitorStream.getStreamGroup(), monitorStream);
+    	});
+    	return result;
+    }
+    
+    private StreamGroup createStreamGroup(String streamId, List<String> columns, boolean dedicated) {
+    	StreamPartition partition = new StreamPartition();
+        partition.setType(StreamPartition.Type.GLOBAL);
+        partition.setStreamId(streamId);
+        partition.setColumns(columns);
+
+        StreamGroup group = new StreamGroup();
+        group.addStreamPartition(partition, dedicated);
+        return group;
+    }
+    
+    private void print(Collection<TopologyUsage> usages) {
+    	try {
+    		ObjectMapper om = new ObjectMapper();
+        	LOG.info(">>>" + om.writeValueAsString(usages));
+    	} catch (Exception e) {}
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75ab7722/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application-multiplestreams.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application-multiplestreams.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application-multiplestreams.conf
new file mode 100644
index 0000000..9389d9b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application-multiplestreams.conf
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "coordinator": {
+    "policiesPerBolt": 5,
+    "boltParallelism": 5,
+    "policyDefaultParallelism": 5,
+    "boltLoadUpbound": 0.8,
+    "topologyLoadUpbound": 0.8,
+    "numOfAlertBoltsPerTopology": 5,
+    "reuseBoltInStreams": true,
+    "streamsPerBolt": 2,
+    "zkConfig": {
+      "zkQuorum": "localhost:2181",
+      "zkRoot": "/alert",
+      "zkSessionTimeoutMs": 10000,
+      "connectionTimeoutMs": 10000,
+      "zkRetryTimes": 3,
+      "zkRetryInterval": 3000
+    }
+    "metadataService": {
+      "host": "localhost",
+      "port": 8080,
+      "context": "/rest"
+    }
+    "metadataDynamicCheck": {
+      "initDelayMillis": 1000,
+      "delayMillis": 30000
+    },
+    "kafkaProducer": {
+      "bootstrapServers": "localhost:9092"
+    },
+    "email": {
+      "sender": "eagle@eagle.com",
+      "recipients": "test@eagle.com",
+      "mailSmtpHost": "test.eagle.com",
+      "mailSmtpPort": "25"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75ab7722/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application-multiplestreams2.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application-multiplestreams2.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application-multiplestreams2.conf
new file mode 100644
index 0000000..d75ccc4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/application-multiplestreams2.conf
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "coordinator": {
+    "policiesPerBolt": 5,
+    "boltParallelism": 5,
+    "policyDefaultParallelism": 5,
+    "boltLoadUpbound": 0.8,
+    "topologyLoadUpbound": 0.8,
+    "numOfAlertBoltsPerTopology": 5,
+    "streamsPerBolt": 5,
+    "zkConfig": {
+      "zkQuorum": "localhost:2181",
+      "zkRoot": "/alert",
+      "zkSessionTimeoutMs": 10000,
+      "connectionTimeoutMs": 10000,
+      "zkRetryTimes": 3,
+      "zkRetryInterval": 3000
+    }
+    "metadataService": {
+      "host": "localhost",
+      "port": 8080,
+      "context": "/rest"
+    }
+    "metadataDynamicCheck": {
+      "initDelayMillis": 1000,
+      "delayMillis": 30000
+    },
+    "kafkaProducer": {
+      "bootstrapServers": "localhost:9092"
+    },
+    "email": {
+      "sender": "eagle@eagle.com",
+      "recipients": "test@eagle.com",
+      "mailSmtpHost": "test.eagle.com",
+      "mailSmtpPort": "25"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75ab7722/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSerializableUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSerializableUtils.java b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSerializableUtils.java
index 0016086..9767d9e 100644
--- a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSerializableUtils.java
+++ b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestSerializableUtils.java
@@ -17,10 +17,12 @@
 package org.apache.eagle.common;
 
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.Serializable;
 
+@Ignore
 public class TestSerializableUtils {
     @Test
     public void testSerializeObject() {