You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/08 07:14:16 UTC

[15/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle problems on eagle-alert module and enable failOnViolation

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties
index fb13ad5..ba06033 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/log4j.properties
@@ -12,9 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 log4j.rootLogger=DEBUG, stdout
-
 # standard output
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
index 205905b..5a78b6a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/streamDef.json
@@ -1,42 +1,45 @@
 {
-	"streamId": "perfmon_cpu_stream",
-	"dataSource" : "perfmon_datasource",
-	"description":"the data stream for perfmon cpu metrics",
-	"validate": false,
-	"timeseries":false,
-	"columns": [
-		{
-			"name": "host",
-			"type" : "string",
-			"defaultValue": "",
-			"required":true
-		},
-		{
-			"name": "timestamp",
-			"type" : "long",
-			"defaultValue": 0,
-			"required":true
-		},{
-			"name": "floatField",
-			"type" : "float",
-			"defaultValue": "1.2",
-			"required": true
-		},{
-			"name": "intField",
-			"type" : "int",
-			"defaultValue": "3",
-			"required":true
-		},{
-			"name": "value",
-			"type" : "double",
-			"defaultValue": 0.0,
-			"required":true
-		},
-		{
-			"name": "boolField",
-			"type" : "bool",
-			"defaultValue": true,
-			"required":true
-		}
-	]
+  "streamId": "perfmon_cpu_stream",
+  "dataSource": "perfmon_datasource",
+  "description": "the data stream for perfmon cpu metrics",
+  "validate": false,
+  "timeseries": false,
+  "columns": [
+    {
+      "name": "host",
+      "type": "string",
+      "defaultValue": "",
+      "required": true
+    },
+    {
+      "name": "timestamp",
+      "type": "long",
+      "defaultValue": 0,
+      "required": true
+    },
+    {
+      "name": "floatField",
+      "type": "float",
+      "defaultValue": "1.2",
+      "required": true
+    },
+    {
+      "name": "intField",
+      "type": "int",
+      "defaultValue": "3",
+      "required": true
+    },
+    {
+      "name": "value",
+      "type": "double",
+      "defaultValue": 0.0,
+      "required": true
+    },
+    {
+      "name": "boolField",
+      "type": "bool",
+      "defaultValue": true,
+      "required": true
+    }
+  ]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
index bd06919..4589e63 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/pom.xml
@@ -50,16 +50,16 @@
             <artifactId>jersey-multipart</artifactId>
         </dependency>
         <!--<dependency>-->
-            <!--<groupId>org.codehaus.jackson</groupId>-->
-            <!--<artifactId>jackson-mapper-asl</artifactId>-->
+        <!--<groupId>org.codehaus.jackson</groupId>-->
+        <!--<artifactId>jackson-mapper-asl</artifactId>-->
         <!--</dependency>-->
         <!--<dependency>-->
-            <!--<groupId>org.codehaus.jackson</groupId>-->
-            <!--<artifactId>jackson-jaxrs</artifactId>-->
+        <!--<groupId>org.codehaus.jackson</groupId>-->
+        <!--<artifactId>jackson-jaxrs</artifactId>-->
         <!--</dependency>-->
         <!--<dependency>-->
-            <!--<groupId>org.codehaus.jackson</groupId>-->
-            <!--<artifactId>jackson-xc</artifactId>-->
+        <!--<groupId>org.codehaus.jackson</groupId>-->
+        <!--<artifactId>jackson-xc</artifactId>-->
         <!--</dependency>-->
         <dependency>
             <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
index 5c455f6..563db58 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
@@ -16,15 +16,6 @@
  */
 package org.apache.eagle.alert.coordinator;
 
-import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.eagle.alert.config.ConfigBusProducer;
 import org.apache.eagle.alert.config.ConfigValue;
 import org.apache.eagle.alert.config.ZKConfig;
@@ -37,32 +28,43 @@ import org.apache.eagle.alert.coordinator.trigger.PolicyChangeListener;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
 import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.base.Stopwatch;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * TODO: To simply avoid concurrent call of schdule, make the schedule as synchronized. This is not safe when multiple
  * instance, consider a distributed lock for prevent multiple schedule happen concurrently.
+ *
+ * <p>Coordinator is a standalone java application, which listens to policy changes and use schedule
+ * algorithm to distribute policies 1) reacting to shutdown events 2) start non-daemon thread to pull policies
+ * and figure out if polices are changed</p>
  * 
- * @since Mar 24, 2016 Coordinator is a standalone java application, which listens to policy changes and use schedule
- *        algorithm to distribute policies 1) reacting to shutdown events 2) start non-daemon thread to pull policies
- *        and figure out if polices are changed
+ * @since Mar 24, 2016 
  */
 public class Coordinator {
 
     private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
 
     private static final String COORDINATOR = "coordinator";
+    
     /**
-     * {@link ZKMetadataChangeNotifyService}
-     *  /alert/{topologyName}/spout
-     *                  /router
-     *                  /alert
-     *                  /publisher
+     * /alert/{topologyName}/spout
+     * /router
+     * /alert
+     * /publisher
+     * .
      */
     private static final String ZK_ALERT_CONFIG_SPOUT = "{0}/spout";
     private static final String ZK_ALERT_CONFIG_ROUTER = "{0}/router";
@@ -70,16 +72,16 @@ public class Coordinator {
     private static final String ZK_ALERT_CONFIG_PUBLISHER = "{0}/publisher";
 
 
-    private final static String METADATA_SERVICE_HOST = "metadataService.host";
-    private final static String METADATA_SERVICE_PORT = "metadataService.port";
-    private final static String METADATA_SERVICE_CONTEXT = "metadataService.context";
-    private final static String DYNAMIC_POLICY_LOADER_INIT_MILLS = "metadataDynamicCheck.initDelayMillis";
-    private final static String DYNAMIC_POLICY_LOADER_DELAY_MILLS = "metadataDynamicCheck.delayMillis";
-    
-    private final static String GREEDY_SCHEDULER_ZK_PATH = "/alert/greedy/leader";
-    private final static String POLICY_SCHEDULER_ZK_PATH = "/alert/policy/leader";
-    private final static int ACQUIRE_LOCK_WAIT_INTERVAL_MS = 2000;
-    private final static int ACQUIRE_LOCK_MAX_RETRIES_TIMES = 90; //about 9 minutes
+    private static final String METADATA_SERVICE_HOST = "metadataService.host";
+    private static final String METADATA_SERVICE_PORT = "metadataService.port";
+    private static final String METADATA_SERVICE_CONTEXT = "metadataService.context";
+    private static final String DYNAMIC_POLICY_LOADER_INIT_MILLS = "metadataDynamicCheck.initDelayMillis";
+    private static final String DYNAMIC_POLICY_LOADER_DELAY_MILLS = "metadataDynamicCheck.delayMillis";
+
+    private static final String GREEDY_SCHEDULER_ZK_PATH = "/alert/greedy/leader";
+    private static final String POLICY_SCHEDULER_ZK_PATH = "/alert/policy/leader";
+    private static final int ACQUIRE_LOCK_WAIT_INTERVAL_MS = 2000;
+    private static final int ACQUIRE_LOCK_MAX_RETRIES_TIMES = 90; //about 9 minutes
 
     private volatile ScheduleState currentState = null;
     private final ConfigBusProducer producer;
@@ -103,58 +105,60 @@ public class Coordinator {
     }
 
     public synchronized ScheduleState schedule(ScheduleOption option) {
-    	ScheduleZkState scheduleZkState = new ScheduleZkState();
-    	ExclusiveExecutor.Runnable exclusiveRunnable = new ExclusiveExecutor.Runnable() {
-			@Override
-			public void run() throws Exception {
-				scheduleZkState.scheduleAcquired = true;
-				
-				while (!scheduleZkState.scheduleCompleted) {
-					Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
-				}
-			}
-    	};
-    	ExclusiveExecutor.execute(GREEDY_SCHEDULER_ZK_PATH, exclusiveRunnable);
-    	int waitMaxTimes = 0;
-    	while (waitMaxTimes < ACQUIRE_LOCK_MAX_RETRIES_TIMES) { //about 3 minutes waiting
-    		if (!scheduleZkState.scheduleAcquired) {
-    			waitMaxTimes ++;
-    			try {
-					Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
-				} catch (InterruptedException e) {}
-    			continue;
-    		}
-    		
-    		ScheduleState state = null;
-    		try {
-    			Stopwatch watch = Stopwatch.createStarted();
-    	        IScheduleContext context = new ScheduleContextBuilder(config, client).buildContext();
-    	        TopologyMgmtService mgmtService = new TopologyMgmtService();
-    	        IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
-    	
-    	        scheduler.init(context, mgmtService);
-    	        state = scheduler.schedule(option);
-    	        
-    	        long scheduleTime = watch.elapsed(TimeUnit.MILLISECONDS);
-    	        state.setScheduleTimeMillis((int) scheduleTime);// hardcode to integer
-    	        watch.reset();
-    	        watch.start();
-    	
-    	        // persist & notify
-    	        postSchedule(client, state, producer);
-    	
-    	        watch.stop();
-    	        long postTime = watch.elapsed(TimeUnit.MILLISECONDS);
-    	        LOG.info("Schedule result, schedule time {} ms, post schedule time {} ms !", scheduleTime, postTime);
-    	
-    	        currentState = state;
-    		} finally {
-    			//schedule completed
-    			scheduleZkState.scheduleCompleted = true;
-    		}
-	        return state;
-    	}
-    	throw new LockWebApplicationException("Acquire scheduler lock failed, please retry later");
+        ScheduleZkState scheduleZkState = new ScheduleZkState();
+        ExclusiveExecutor.Runnable exclusiveRunnable = new ExclusiveExecutor.Runnable() {
+            @Override
+            public void run() throws Exception {
+                scheduleZkState.scheduleAcquired = true;
+
+                while (!scheduleZkState.scheduleCompleted) {
+                    Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
+                }
+            }
+        };
+        ExclusiveExecutor.execute(GREEDY_SCHEDULER_ZK_PATH, exclusiveRunnable);
+        int waitMaxTimes = 0;
+        while (waitMaxTimes < ACQUIRE_LOCK_MAX_RETRIES_TIMES) { //about 3 minutes waiting
+            if (!scheduleZkState.scheduleAcquired) {
+                waitMaxTimes++;
+                try {
+                    Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                    // ignored
+                }
+                continue;
+            }
+
+            ScheduleState state = null;
+            try {
+                Stopwatch watch = Stopwatch.createStarted();
+                IScheduleContext context = new ScheduleContextBuilder(config, client).buildContext();
+                TopologyMgmtService mgmtService = new TopologyMgmtService();
+                IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
+
+                scheduler.init(context, mgmtService);
+                state = scheduler.schedule(option);
+
+                long scheduleTime = watch.elapsed(TimeUnit.MILLISECONDS);
+                state.setScheduleTimeMillis((int) scheduleTime);// hardcode to integer
+                watch.reset();
+                watch.start();
+
+                // persist & notify
+                postSchedule(client, state, producer);
+
+                watch.stop();
+                long postTime = watch.elapsed(TimeUnit.MILLISECONDS);
+                LOG.info("Schedule result, schedule time {} ms, post schedule time {} ms !", scheduleTime, postTime);
+
+                currentState = state;
+            } finally {
+                //schedule completed
+                scheduleZkState.scheduleCompleted = true;
+            }
+            return state;
+        }
+        throw new LockWebApplicationException("Acquire scheduler lock failed, please retry later");
     }
 
     public static void postSchedule(IMetadataServiceClient client, ScheduleState state, ConfigBusProducer producer) {
@@ -187,7 +191,7 @@ public class Coordinator {
     }
 
     /**
-     * shutdown background threads and release various resources
+     * shutdown background threads and release various resources.
      */
     private static class CoordinatorShutdownHook implements Runnable {
         private static final Logger LOG = LoggerFactory.getLogger(CoordinatorShutdownHook.class);
@@ -215,7 +219,7 @@ public class Coordinator {
     }
 
     private static class PolicyChangeHandler implements PolicyChangeListener {
-        private final static Logger LOG = LoggerFactory.getLogger(PolicyChangeHandler.class);
+        private static final Logger LOG = LoggerFactory.getLogger(PolicyChangeHandler.class);
         private Config config;
         private IMetadataServiceClient client;
 
@@ -226,11 +230,11 @@ public class Coordinator {
 
         @Override
         public void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> addedPolicies,
-                Collection<String> removedPolicies, Collection<String> modifiedPolicies) {
+                                   Collection<String> removedPolicies, Collection<String> modifiedPolicies) {
             LOG.info("policy changed ... ");
             LOG.info("allPolicies: " + allPolicies + ", addedPolicies: " + addedPolicies + ", removedPolicies: "
-                    + removedPolicies + ", modifiedPolicies: " + modifiedPolicies);
-            
+                + removedPolicies + ", modifiedPolicies: " + modifiedPolicies);
+
             CoordinatorTrigger trigger = new CoordinatorTrigger(config, client);
             trigger.run();
 
@@ -239,48 +243,48 @@ public class Coordinator {
 
     public static void main(String[] args) throws Exception {
         startSchedule();
-        
+
         Thread.currentThread().join();
     }
 
     public static void startSchedule() {
-    	ExclusiveExecutor.execute(POLICY_SCHEDULER_ZK_PATH, new ExclusiveExecutor.Runnable() {
-			
-			@Override
-			public void run() throws Exception {
-		        Config config = ConfigFactory.load().getConfig(COORDINATOR);
-		        // build dynamic policy loader
-		        String host = config.getString(METADATA_SERVICE_HOST);
-		        int port = config.getInt(METADATA_SERVICE_PORT);
-		        String context = config.getString(METADATA_SERVICE_CONTEXT);
-		        IMetadataServiceClient client = new MetadataServiceClientImpl(host, port, context);
-		        DynamicPolicyLoader loader = new DynamicPolicyLoader(client);
-		        loader.addPolicyChangeListener(new PolicyChangeHandler(config, client));
-
-		        // schedule dynamic policy loader
-		        long initDelayMillis = config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS);
-		        long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS);
-		        ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(2, new ThreadFactory() {
-		            @Override
-		            public Thread newThread(Runnable r) {
-		                Thread t = new Thread(r);
-		                t.setDaemon(true);
-		                return t;
-		            }
-		        });
-		        scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS);
-		        
-		        // 
-		        scheduleSrv.scheduleAtFixedRate(new CoordinatorTrigger(config, client), CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_DELAY,
-		                CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_INTERVAL, TimeUnit.MILLISECONDS);
-		        
-		        Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv)));
-		        LOG.info("Eagle Coordinator started ...");
-                
+        ExclusiveExecutor.execute(POLICY_SCHEDULER_ZK_PATH, new ExclusiveExecutor.Runnable() {
+
+            @Override
+            public void run() throws Exception {
+                Config config = ConfigFactory.load().getConfig(COORDINATOR);
+                // build dynamic policy loader
+                String host = config.getString(METADATA_SERVICE_HOST);
+                int port = config.getInt(METADATA_SERVICE_PORT);
+                String context = config.getString(METADATA_SERVICE_CONTEXT);
+                IMetadataServiceClient client = new MetadataServiceClientImpl(host, port, context);
+                DynamicPolicyLoader loader = new DynamicPolicyLoader(client);
+                loader.addPolicyChangeListener(new PolicyChangeHandler(config, client));
+
+                // schedule dynamic policy loader
+                long initDelayMillis = config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS);
+                long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS);
+                ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(2, new ThreadFactory() {
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        Thread t = new Thread(r);
+                        t.setDaemon(true);
+                        return t;
+                    }
+                });
+                scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS);
+
+                //
+                scheduleSrv.scheduleAtFixedRate(new CoordinatorTrigger(config, client), CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_DELAY,
+                    CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_INTERVAL, TimeUnit.MILLISECONDS);
+
+                Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv)));
+                LOG.info("Eagle Coordinator started ...");
+
                 Thread.currentThread().join();
-			}
-			
-		});
+            }
+
+        });
     }
 
     public void enforcePeriodicallyBuild() {
@@ -294,10 +298,10 @@ public class Coordinator {
     public static boolean isPeriodicallyForceBuildEnable() {
         return forcePeriodicallyBuild.get();
     }
-    
+
     public static class ScheduleZkState {
-    	volatile boolean scheduleAcquired = false;
+        volatile boolean scheduleAcquired = false;
         volatile boolean scheduleCompleted = false;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
index 3458b3e..0a09de2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
@@ -16,10 +16,6 @@
  */
 package org.apache.eagle.alert.coordinator;
 
-/**
- * @since Apr 22, 2016
- *
- */
 public class CoordinatorConstants {
     public static final String CONFIG_ITEM_COORDINATOR = "coordinator";
     public static final String CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND = "topologyLoadUpbound";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
index c9bda16..7ebf26a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorListener.java
@@ -16,29 +16,27 @@
  */
 package org.apache.eagle.alert.coordinator;
 
-import javax.servlet.ServletContextEvent;
-import javax.servlet.ServletContextListener;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
 
 /**
- * @since Jun 16, 2016
- *
+ * @since Jun 16, 2016.
  */
 public class CoordinatorListener implements ServletContextListener {
-    
+
     private static final Logger LOG = LoggerFactory.getLogger(CoordinatorListener.class);
-    
+
     public CoordinatorListener() {
     }
-    
+
     @Override
     public void contextInitialized(ServletContextEvent sce) {
-    	LOG.info("start coordinator background tasks..");
+        LOG.info("start coordinator background tasks..");
         Coordinator.startSchedule();
     }
-    
+
     @Override
     public void contextDestroyed(ServletContextEvent sce) {
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
index 96e6fce..74329e3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
@@ -16,6 +16,10 @@
  */
 package org.apache.eagle.alert.coordinator;
 
+import org.apache.eagle.alert.config.ZKConfig;
+import org.apache.eagle.alert.config.ZKConfigBuilder;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -25,94 +29,88 @@ import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.CloseableUtils;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.config.ZKConfigBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
 public class ExclusiveExecutor {
 
-	private static final Logger LOG = LoggerFactory.getLogger(ExclusiveExecutor.class);
-
-	// private static final String PATH = "/alert/listener/leader";
-	private static final String COORDINATOR = "coordinator";
-	private static final int ZK_RETRYPOLICY_SLEEP_TIME_MS = 1000;
-	private static final int ZK_RETRYPOLICY_MAX_RETRIES = 3;
+    private static final Logger LOG = LoggerFactory.getLogger(ExclusiveExecutor.class);
 
-	private static final CuratorFramework client;
+    // private static final String PATH = "/alert/listener/leader";
+    private static final String COORDINATOR = "coordinator";
+    private static final int ZK_RETRYPOLICY_SLEEP_TIME_MS = 1000;
+    private static final int ZK_RETRYPOLICY_MAX_RETRIES = 3;
 
-	static {
-		Config config = ConfigFactory.load().getConfig(COORDINATOR);
-		RetryPolicy retryPolicy = new ExponentialBackoffRetry(ZK_RETRYPOLICY_SLEEP_TIME_MS, ZK_RETRYPOLICY_MAX_RETRIES);
-		ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-		client = CuratorFrameworkFactory.newClient(zkConfig.zkQuorum, retryPolicy);
-		client.start();
-	}
+    private static final CuratorFramework client;
 
-	public static abstract class Runnable {
+    static {
+        Config config = ConfigFactory.load().getConfig(COORDINATOR);
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(ZK_RETRYPOLICY_SLEEP_TIME_MS, ZK_RETRYPOLICY_MAX_RETRIES);
+        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+        client = CuratorFrameworkFactory.newClient(zkConfig.zkQuorum, retryPolicy);
+        client.start();
+    }
 
-		boolean completed = false;
-		LeaderSelector selector;
+    public abstract static class Runnable {
+        boolean completed = false;
+        LeaderSelector selector;
 
-		public abstract void run() throws Exception;
+        public abstract void run() throws Exception;
 
-		public void registerResources(LeaderSelector selector) {
-			this.selector = selector;
-		}
+        public void registerResources(LeaderSelector selector) {
+            this.selector = selector;
+        }
 
-		public void runElegantly() throws Exception {
-			this.run();
+        public void runElegantly() throws Exception {
+            this.run();
 
-			LOG.info("Close selector resources {}", this.selector);
-			CloseableUtils.closeQuietly(this.selector);
+            LOG.info("Close selector resources {}", this.selector);
+            CloseableUtils.closeQuietly(this.selector);
 
-			completed = true;
-		}
+            completed = true;
+        }
 
-		public boolean isCompleted() {
-			return completed;
-		}
+        public boolean isCompleted() {
+            return completed;
+        }
 
-	}
+    }
 
-	public static void execute(String path, final Runnable runnable) {
-		LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() {
+    public static void execute(String path, final Runnable runnable) {
+        LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() {
 
-			@Override
-			public void takeLeadership(CuratorFramework client) throws Exception {
-				// this callback will get called when you are the leader
-				// do whatever leader work you need to and only exit
-				// this method when you want to relinquish leadership
-				LOG.info("this is leader node right now..");
-				runnable.runElegantly();
-			}
+            @Override
+            public void takeLeadership(CuratorFramework client) throws Exception {
+                // this callback will get called when you are the leader
+                // do whatever leader work you need to and only exit
+                // this method when you want to relinquish leadership
+                LOG.info("this is leader node right now..");
+                runnable.runElegantly();
+            }
 
-			@Override
-			public void stateChanged(CuratorFramework client, ConnectionState newState) {
-				LOG.info(String.format("leader selector state change listener, new state: %s", newState.toString()));
-			}
+            @Override
+            public void stateChanged(CuratorFramework client, ConnectionState newState) {
+                LOG.info(String.format("leader selector state change listener, new state: %s", newState.toString()));
+            }
 
-		};
+        };
 
-		LeaderSelector selector = new LeaderSelector(client, path, listener);
-		selector.autoRequeue(); // not required, but this is behavior that you
-								// will probably expect
-		selector.start();
+        LeaderSelector selector = new LeaderSelector(client, path, listener);
+        selector.autoRequeue(); // not required, but this is behavior that you
+        // will probably expect
+        selector.start();
 
-		runnable.registerResources(selector);
+        runnable.registerResources(selector);
 
-		Runtime.getRuntime().addShutdownHook(new Thread(new java.lang.Runnable() {
+        Runtime.getRuntime().addShutdownHook(new Thread(new java.lang.Runnable() {
 
-			@Override
-			public void run() {
-				LOG.info("Close zk client resources {}", ExclusiveExecutor.client);
-				CloseableUtils.closeQuietly(ExclusiveExecutor.client);
-			}
+            @Override
+            public void run() {
+                LOG.info("Close zk client resources {}", ExclusiveExecutor.client);
+                CloseableUtils.closeQuietly(ExclusiveExecutor.client);
+            }
 
-		}));
-	}
+        }));
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
index 5e61443..0cf4a5d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
@@ -19,8 +19,7 @@ package org.apache.eagle.alert.coordinator;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 
 /**
- * @since Mar 24, 2016
- *
+ * @since Mar 24, 2016.
  */
 public interface IPolicyScheduler {
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
index 0cde22d..b21948b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
@@ -16,8 +16,6 @@
  */
 package org.apache.eagle.alert.coordinator;
 
-import java.util.Map;
-
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
 import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -28,9 +26,10 @@ import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 
+import java.util.Map;
+
 /**
- * @since Mar 28, 2016
- *
+ * @since Mar 28, 2016.
  */
 public interface IScheduleContext {
 
@@ -48,7 +47,7 @@ public interface IScheduleContext {
     Map<String, PolicyAssignment> getPolicyAssignments();
 
     Map<StreamGroup, MonitoredStream> getMonitoredStreams();
-    
+
     Map<String, Publishment> getPublishments();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java
index 69e799f..29e8cac 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/LockWebApplicationException.java
@@ -21,14 +21,14 @@ import javax.ws.rs.core.Response;
 
 public class LockWebApplicationException extends WebApplicationException {
 
-	private static final long serialVersionUID = 3441072187262776401L;
+    private static final long serialVersionUID = 3441072187262776401L;
 
-	public LockWebApplicationException() {
-		super(Response.Status.INTERNAL_SERVER_ERROR);
-	}
+    public LockWebApplicationException() {
+        super(Response.Status.INTERNAL_SERVER_ERROR);
+    }
 
-	public LockWebApplicationException(String message) {
-		super(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(message).type("text/plain").build());
-	}
+    public LockWebApplicationException(String message) {
+        super(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(message).type("text/plain").build());
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
index 8bccb53..c1bc726 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
@@ -19,8 +19,7 @@ package org.apache.eagle.alert.coordinator;
 import org.apache.eagle.alert.coordinator.impl.GreedyPolicyScheduler;
 
 /**
- * @since Mar 24, 2016
- *
+ * @since Mar 24, 2016.
  */
 public class PolicySchedulerFactory {
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
index 6c04e61..6fe27c5 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
@@ -18,11 +18,9 @@ package org.apache.eagle.alert.coordinator;
 
 /**
  * A runtime option for one schedule processing.
- * 
- * Could used for configuration override.
- * 
- * @since Apr 19, 2016
+ * <p>Could used for configuration override.</p>
  *
+ * @since Apr 19, 2016
  */
 public class ScheduleOption {
     private int policiesPerBolt;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
index 4ae07f5..4ca9d5e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
@@ -19,18 +19,16 @@ package org.apache.eagle.alert.coordinator;
 import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_COORDINATOR;
 import static org.apache.eagle.alert.coordinator.CoordinatorConstants.NUM_OF_ALERT_BOLTS_PER_TOPOLOGY;
 
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.eagle.alert.coordination.model.internal.Topology;
 import org.apache.eagle.alert.coordinator.model.TopologyUsage;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import java.util.Collections;
+import java.util.List;
 
 /**
- * @since Mar 29, 2016
- *
+ * @since Mar 29, 2016.
  */
 public class TopologyMgmtService {
 
@@ -61,21 +59,19 @@ public class TopologyMgmtService {
         boltParallelism = config.getInt(CoordinatorConstants.BOLT_PARALLELISM);
         numberOfBoltsPerTopology = config.getInt(NUM_OF_ALERT_BOLTS_PER_TOPOLOGY);
     }
-    
+
     public int getNumberOfAlertBoltsInTopology() {
         return numberOfBoltsPerTopology;
     }
 
     /**
-     * TODO: call topology mgmt API to create a topology
-     * 
-     * @return
+     * TODO: call topology mgmt API to create a topology.
      */
     public TopologyMeta creatTopology() {
         // TODO
         throw new UnsupportedOperationException("not supported yet!");
     }
-    
+
     public List<TopologyMeta> listTopologies() {
         // TODO
         return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
index ebc533e..49a16ff 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
@@ -16,24 +16,10 @@
  */
 package org.apache.eagle.alert.coordinator.impl;
 
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_BOLT_LOAD_UPBOUND;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_COORDINATOR;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.POLICIES_PER_BOLT;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.POLICY_DEFAULT_PARALLELISM;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
+import static org.apache.eagle.alert.coordinator.CoordinatorConstants.*;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordination.model.internal.*;
 import org.apache.eagle.alert.coordinator.IPolicyScheduler;
 import org.apache.eagle.alert.coordinator.IScheduleContext;
 import org.apache.eagle.alert.coordinator.ScheduleOption;
@@ -44,31 +30,31 @@ import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.utils.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
 
 /**
  * A simple greedy assigner. <br/>
- * A greedy assigner simply loop the policies, find the most suitable topology
+ *
+ * <p>A greedy assigner simply loop the policies, find the most suitable topology
  * to locate the policy first, then assign the topics to corresponding
- * spouts/group-by bolts.
- * 
- * <br/>
- * For each given policy, the greedy steps are
+ * spouts/group-by bolts.</p>
+ *
+ * <p>For each given policy, the greedy steps are</p>
+ *
  * <ul>
  * <li>1. Find the same topology that already serve the policy without exceed the load</li>
  * <li>2. Find the topology that already take the source traffic without exceed the load</li>
  * <li>3. Find the topology that available to place source topic without exceed the load</li>
  * <li>4. Create a new topology and locate the policy</li>
  * <li>Route table generated after all policies assigned</li>
- * <ul>
- * <br/>
- * 
+ * </ul>
  * @since Mar 24, 2016
- *
  */
 public class GreedyPolicyScheduler implements IPolicyScheduler {
 
@@ -111,8 +97,8 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
          * Answer: two possible place: a global route table will be generated, those target not in current topology tuples will be dropped. This make the partition for tuple to alert
          * <li>How to support add topology on demand by evaluate the available topology bandwidth(need topology level load)?</li>
          * Answer: Use configured topology load up-bound, when topology load is available, will adopt
-         * <ul>
-         * <pre>
+         * </ul>
+         * </pre>
          */
         List<ScheduleResult> results = new ArrayList<ScheduleResult>();
         Map<String, PolicyAssignment> newAssignments = new HashMap<String, PolicyAssignment>();
@@ -152,13 +138,13 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
     }
 
     private ScheduleState generateMonitorMetadata(List<WorkItem> expandworkSets,
-            Map<String, PolicyAssignment> newAssignments) {
+                                                  Map<String, PolicyAssignment> newAssignments) {
         MonitorMetadataGenerator generator = new MonitorMetadataGenerator(context);
         return generator.generate(expandworkSets);
     }
 
     private void placePolicy(PolicyDefinition def, AlertBoltUsage alertBoltUsage, Topology targetTopology,
-            TopologyUsage usage) {
+                             TopologyUsage usage) {
         String policyName = def.getName();
 
         // topology usage update
@@ -169,26 +155,26 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
 
         // update source topics
         updateDataSource(usage, def);
-        
+
         // update group-by
         updateGrouping(usage, def);
     }
 
     private void updateGrouping(TopologyUsage usage, PolicyDefinition def) {
         // groupByMeta is removed since groupspec generate doesnt need it now. 
-//        List<StreamPartition> policyPartitionSpec = def.getPartitionSpec();
-//        Map<String, List<StreamPartition>> groupByMeta = usage.getGroupByMeta();
-//        for (StreamPartition par : policyPartitionSpec) {
-//            List<StreamPartition> partitions = groupByMeta.get(par.getStreamId());
-//            if (partitions == null) {
-//                partitions = new ArrayList<StreamPartition>();
-//                // de-dup of the partition on the list?
-//                groupByMeta.put(par.getStreamId(), partitions);
-//            }
-//            if (!partitions.contains(par)) {
-//                partitions.add(par);
-//            }
-//        }
+        //        List<StreamPartition> policyPartitionSpec = def.getPartitionSpec();
+        //        Map<String, List<StreamPartition>> groupByMeta = usage.getGroupByMeta();
+        //        for (StreamPartition par : policyPartitionSpec) {
+        //            List<StreamPartition> partitions = groupByMeta.get(par.getStreamId());
+        //            if (partitions == null) {
+        //                partitions = new ArrayList<StreamPartition>();
+        //                // de-dup of the partition on the list?
+        //                groupByMeta.put(par.getStreamId(), partitions);
+        //            }
+        //            if (!partitions.contains(par)) {
+        //                partitions.add(par);
+        //            }
+        //        }
     }
 
     private void updateDataSource(TopologyUsage usage, PolicyDefinition def) {
@@ -216,13 +202,13 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
      * <li>3. Find the topology that available to place source topic</li>
      * <li>4. Create a new topology and locate the policy</li>
      * <li>Route table generated after all policies assigned</li>
-     * <ul>
+     * </ul>
      * <br/>
-     * 
+     *
      * @param newAssignments
      */
     private ScheduleResult schedulePolicy(WorkItem item, Map<String, PolicyAssignment> newAssignments) {
-        LOG.info(" schedule for {}", item );
+        LOG.info(" schedule for {}", item);
 
         String policyName = item.def.getName();
         StreamGroup policyStreamPartition = new StreamGroup();
@@ -260,14 +246,14 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
     }
 
     private void placePolicyToQueue(PolicyDefinition def, StreamWorkSlotQueue queue,
-            Map<String, PolicyAssignment> newAssignments) {
+                                    Map<String, PolicyAssignment> newAssignments) {
         for (WorkSlot slot : queue.getWorkingSlots()) {
             Topology targetTopology = context.getTopologies().get(slot.getTopologyName());
             TopologyUsage usage = context.getTopologyUsages().get(slot.getTopologyName());
             AlertBoltUsage alertBoltUsage = usage.getAlertBoltUsage(slot.getBoltId());
             placePolicy(def, alertBoltUsage, targetTopology, usage);
         }
-//        queue.placePolicy(def);
+        // queue.placePolicy(def);
         PolicyAssignment assignment = new PolicyAssignment(def.getName(), queue.getQueueId());
         context.getPolicyAssignments().put(def.getName(), assignment);
         newAssignments.put(def.getName(), assignment);
@@ -286,22 +272,19 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
             WorkQueueBuilder builder = new WorkQueueBuilder(context, mgmtService);
             // TODO : get the properties from policy definiton
             targetQueue = builder.createQueue(targetdStream, false, getQueueSize(def.getParallelismHint()),
-                    new HashMap<String, Object>());
+                new HashMap<String, Object>());
         }
         return targetQueue;
     }
 
     /**
-     * Some strategy to generate correct size in Startegy of queue builder
-     * 
-     * @param hint
-     * @return
+     * Some strategy to generate correct size in Startegy of queue builder.
      */
     private int getQueueSize(int hint) {
-    	if (hint == 0) {
-    		// some policies require single bolt to execute
-    		return 1;
-    	}
+        if (hint == 0) {
+            // some policies require single bolt to execute
+            return 1;
+        }
         return initialQueueSize * ((hint + initialQueueSize - 1) / initialQueueSize);
     }
 
@@ -323,7 +306,7 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
     private boolean isBoltAvailable(AlertBoltUsage boltUsage, PolicyDefinition def) {
         // overload or over policy # or already contains
         if (boltUsage == null || boltUsage.getLoad() > boltLoadUpbound
-                || boltUsage.getPolicies().size() > policiesPerBolt || boltUsage.getPolicies().contains(def.getName())) {
+            || boltUsage.getPolicies().size() > policiesPerBolt || boltUsage.getPolicies().contains(def.getName())) {
             return false;
         }
         return true;
@@ -333,7 +316,7 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
         this.context = new InMemScheduleConext(context);
         this.mgmtService = mgmtService;
     }
-    
+
     public IScheduleContext getContext() {
         return context;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
index 40f16e9..c5c992b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
@@ -16,22 +16,7 @@
  */
 package org.apache.eagle.alert.coordinator.impl;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
-import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.coordination.model.*;
 import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
 import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
 import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
@@ -46,12 +31,18 @@ import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
- * @since Apr 26, 2016
  * Given current policy placement, figure out monitor metadata
- * 
- * TODO: refactor to eliminate the duplicate of stupid if-notInMap-then-create....
- * FIXME: too many duplicated code logic : check null; add list to map; add to list.. 
+ *
+ * <p>TODO: refactor to eliminate the duplicate of stupid if-notInMap-then-create....
+ * FIXME: too many duplicated code logic : check null; add list to map; add to list..</p>
+ *
+ * @since Apr 26, 2016
  */
 public class MonitorMetadataGenerator {
 
@@ -76,15 +67,15 @@ public class MonitorMetadataGenerator {
         Map<String, PublishSpec> publishSpecsMap = generatePublishMetadata();
 
         String uniqueVersion = generateVersion();
-        ScheduleState status = new ScheduleState(uniqueVersion, 
-                topoSpoutSpecsMap, 
-                groupSpecsMap, 
-                alertSpecsMap,
-                publishSpecsMap, 
-                context.getPolicyAssignments().values(), 
-                context.getMonitoredStreams().values(),
-                context.getPolicies().values(),
-                context.getStreamSchemas().values());
+        ScheduleState status = new ScheduleState(uniqueVersion,
+            topoSpoutSpecsMap,
+            groupSpecsMap,
+            alertSpecsMap,
+            publishSpecsMap,
+            context.getPolicyAssignments().values(),
+            context.getMonitoredStreams().values(),
+            context.getPolicies().values(),
+            context.getStreamSchemas().values());
         return status;
     }
 
@@ -127,7 +118,7 @@ public class MonitorMetadataGenerator {
     }
 
     /**
-     * FIXME: add auto-increment version number?
+     * FIXME: add auto-increment version number?.
      */
     private String generateVersion() {
         return "spec_version_" + System.currentTimeMillis();
@@ -159,7 +150,7 @@ public class MonitorMetadataGenerator {
                 spec = new RouterSpec(u.getTopoName());
                 groupSpecsMap.put(u.getTopoName(), spec);
             }
-            
+
             for (MonitoredStream ms : u.getMonitoredStream()) {
                 // mutiple stream on the same policy group : for correlation group case:
                 for (StreamPartition partiton : ms.getStreamGroup().getStreamPartitions()) {
@@ -184,7 +175,7 @@ public class MonitorMetadataGenerator {
 
     private Map<String, SpoutSpec> generateSpoutMonitorMetadata() {
         Map<String, StreamWorkSlotQueue> queueMap = buildQueueMap();
-        
+
         Map<String, SpoutSpec> topoSpoutSpecsMap = new HashMap<String, SpoutSpec>();
         // streamName -> StreamDefinition
         Map<String, StreamDefinition> streamSchemaMap = context.getStreamSchemas();
@@ -207,7 +198,7 @@ public class MonitorMetadataGenerator {
             Map<String, List<StreamRepartitionMetadata>> streamsMap = new HashMap<String, List<StreamRepartitionMetadata>>();
             for (String policyName : usage.getPolicies()) {
                 PolicyDefinition def = context.getPolicies().get(policyName);
-                
+
                 PolicyAssignment assignment = context.getPolicyAssignments().get(policyName);
                 if (assignment == null) {
                     LOG.error(" can not find assignment for policy {} ! ", policyName);
@@ -245,9 +236,7 @@ public class MonitorMetadataGenerator {
 
     /**
      * Work queue not a root level object, thus we need to build a map from
-     * MonitoredStream for later quick lookup
-     * 
-     * @return
+     * MonitoredStream for later quick lookup.
      */
     private Map<String, StreamWorkSlotQueue> buildQueueMap() {
         Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>();
@@ -260,7 +249,7 @@ public class MonitorMetadataGenerator {
     }
 
     private void addGroupingStrategy(Map<String, List<StreamRepartitionMetadata>> streamsMap, String stream,
-            StreamDefinition schema, String topicName, String datasourceName, StreamRepartitionStrategy gs) {
+                                     StreamDefinition schema, String topicName, String datasourceName, StreamRepartitionStrategy gs) {
         List<StreamRepartitionMetadata> dsStreamMeta;
         if (streamsMap.containsKey(topicName)) {
             dsStreamMeta = streamsMap.get(topicName);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
index ea96d79..a46537d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
@@ -16,17 +16,14 @@
  */
 package org.apache.eagle.alert.coordinator.impl;
 
-import java.util.List;
-
 import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import java.util.List;
 
 /**
- * Schedule result for one policy
- * 
- * 
- * @since Apr 26, 2016
+ * Schedule result for one policy.
  *
+ * @since Apr 26, 2016
  */
 public class ScheduleResult {
     int code;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
index a32b8fb..f44f80e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
@@ -16,10 +16,6 @@
  */
 package org.apache.eagle.alert.coordinator.impl;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.eagle.alert.coordination.model.WorkSlot;
 import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
 import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
@@ -32,9 +28,12 @@ import org.apache.eagle.alert.coordinator.model.TopologyUsage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
- * @since Apr 27, 2016
- *
+ * @since Apr 27, 2016.
  */
 public class WorkQueueBuilder {
 
@@ -49,7 +48,7 @@ public class WorkQueueBuilder {
     }
 
     public StreamWorkSlotQueue createQueue(MonitoredStream stream, boolean isDedicated, int size,
-            Map<String, Object> properties) {
+                                           Map<String, Object> properties) {
         // FIXME: make extensible and configurable
         IWorkSlotStrategy strategy = new SameTopologySlotStrategy(context, stream.getStreamGroup(), mgmtService);
         List<WorkSlot> slots = strategy.reserveWorkSlots(size, isDedicated, properties);
@@ -58,7 +57,7 @@ public class WorkQueueBuilder {
             return null;
         }
         StreamWorkSlotQueue queue = new StreamWorkSlotQueue(stream.getStreamGroup(), isDedicated, properties,
-                slots);
+            slots);
         calculateGroupIndexAndCount(queue);
         assignQueueSlots(stream, queue);// build reverse reference
         stream.addQueues(queue);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
index 28df3c4..8528606 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
@@ -16,14 +16,12 @@
  */
 package org.apache.eagle.alert.coordinator.impl.strategies;
 
+import org.apache.eagle.alert.coordination.model.WorkSlot;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-
 /**
- * @since Apr 27, 2016
- *
+ * @since Apr 27, 2016.
  */
 public interface IWorkSlotStrategy {
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
index e755237..823a548 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
@@ -17,13 +17,6 @@
 package org.apache.eagle.alert.coordinator.impl.strategies;
 
 import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.eagle.alert.coordination.model.WorkSlot;
 import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
 import org.apache.eagle.alert.coordination.model.internal.Topology;
@@ -33,22 +26,22 @@ import org.apache.eagle.alert.coordinator.TopologyMgmtService;
 import org.apache.eagle.alert.coordinator.TopologyMgmtService.TopologyMeta;
 import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
 import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.*;
 
 /**
  * A simple strategy that only find the bolts in the same topology as the
  * required work slots.
- * 
- * Invariant:<br/>
+ *
+ * <p>Invariant:<br/>
  * One slot queue only on the one topology.<br/>
- * One topology doesn't contains two same partition slot queues.
- * 
- * @since Apr 27, 2016
+ * One topology doesn't contains two same partition slot queues.</p>
  *
+ * @since Apr 27, 2016
  */
 public class SameTopologySlotStrategy implements IWorkSlotStrategy {
 
@@ -58,28 +51,27 @@ public class SameTopologySlotStrategy implements IWorkSlotStrategy {
     private final StreamGroup partitionGroup;
     private final TopologyMgmtService mgmtService;
 
-//    private final int numOfPoliciesBoundPerBolt;
+    //    private final int numOfPoliciesBoundPerBolt;
     private final double topoLoadUpbound;
 
     public SameTopologySlotStrategy(IScheduleContext context, StreamGroup streamPartitionGroup,
-            TopologyMgmtService mgmtService) {
+                                    TopologyMgmtService mgmtService) {
         this.context = context;
         this.partitionGroup = streamPartitionGroup;
         this.mgmtService = mgmtService;
 
         Config config = ConfigFactory.load().getConfig(CoordinatorConstants.CONFIG_ITEM_COORDINATOR);
-//        numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT);
+        // numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT);
         topoLoadUpbound = config.getDouble(CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND);
     }
 
     /**
-     * @param isDedicated
-     *            - not used yet!
+     * @param isDedicated - not used yet!.
      */
     @Override
     public List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, Object> properties) {
         Iterator<Topology> it = context.getTopologies().values().stream().filter((t) -> t.getNumOfAlertBolt() >= size)
-                .iterator();
+            .iterator();
         // priority strategy first???
         List<WorkSlot> slots = new ArrayList<WorkSlot>();
         while (it.hasNext()) {
@@ -139,15 +131,15 @@ public class SameTopologySlotStrategy implements IWorkSlotStrategy {
     }
 
     private boolean isTopologyAvailable(TopologyUsage u) {
-//        for (MonitoredStream stream : u.getMonitoredStream()) {
-//            if (partition.equals(stream.getStreamParitition())) {
-//                return false;
-//            }
-//        }
+        //        for (MonitoredStream stream : u.getMonitoredStream()) {
+        //            if (partition.equals(stream.getStreamParitition())) {
+        //                return false;
+        //            }
+        //        }
         if (u == null || u.getLoad() > topoLoadUpbound) {
             return false;
         }
-        
+
         return true;
     }
 
@@ -158,7 +150,7 @@ public class SameTopologySlotStrategy implements IWorkSlotStrategy {
         }
         // actually it's now 0;
         return true;
-//        return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt;
+        //  return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
index e9148f5..36c0bce 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
@@ -16,16 +16,14 @@
  */
 package org.apache.eagle.alert.coordinator.model;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
 import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
- * @since Mar 28, 2016
- *
+ * @since Mar 28, 2016.
  */
 public class AlertBoltUsage {
 
@@ -56,9 +54,9 @@ public class AlertBoltUsage {
     public void addPolicies(PolicyDefinition pd) {
         policies.add(pd.getName());
         // add first partition
-//        for (StreamPartition par : pd.getPartitionSpec()) {
-//            partitions.add(par);
-//        }
+        //        for (StreamPartition par : pd.getPartitionSpec()) {
+        //            partitions.add(par);
+        //        }
     }
 
     public double getLoad() {
@@ -76,7 +74,7 @@ public class AlertBoltUsage {
     public List<StreamWorkSlotQueue> getReferQueues() {
         return referQueues;
     }
-    
+
     public int getQueueSize() {
         return referQueues.size();
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
index 86238d1..39788d5 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
@@ -16,24 +16,22 @@
  */
 package org.apache.eagle.alert.coordinator.model;
 
-
 /**
- * @since Mar 28, 2016
- *
+ * @since Mar 28, 2016.
  */
 public class GroupBoltUsage {
 
     private String boltId;
     private double load;
-    
+
     public GroupBoltUsage(String boltId) {
         this.boltId = boltId;
     }
 
-//    private final Set<String> streams = new HashSet<String>();
-//    private final Map<String, StreamFilter> filters = new HashMap<String, StreamFilter>();
+    //    private final Set<String> streams = new HashSet<String>();
+    //    private final Map<String, StreamFilter> filters = new HashMap<String, StreamFilter>();
 
-//    private final Map<String, List<StreamPartition>> groupByMeta;
+    //    private final Map<String, List<StreamPartition>> groupByMeta;
 
     public double getLoad() {
         return load;
@@ -43,18 +41,18 @@ public class GroupBoltUsage {
         this.load = load;
     }
 
-//    public Set<String> getStreams() {
-//        return streams;
-//    }
-//
-//
-//    public Map<String, StreamFilter> getFilters() {
-//        return filters;
-//    }
+    //    public Set<String> getStreams() {
+    //        return streams;
+    //    }
+    //
+    //
+    //    public Map<String, StreamFilter> getFilters() {
+    //        return filters;
+    //    }
 
-//    public Map<String, List<StreamPartition>> getGroupByMeta() {
-//        return groupByMeta;
-//    }
+    //    public Map<String, List<StreamPartition>> getGroupByMeta() {
+    //        return groupByMeta;
+    //    }
 
     public String getBoltId() {
         return boltId;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
index 6eb6195..3cfc505 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.alert.coordinator.model;
 
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -23,11 +24,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-
 /**
- * @since Mar 27, 2016
- *
+ * @since Mar 27, 2016.
  */
 public class TopologyUsage {
     // topo info
@@ -48,11 +46,11 @@ public class TopologyUsage {
 
     public TopologyUsage() {
     }
-    
+
     public TopologyUsage(String name) {
         this.topoName = name;
     }
-    
+
     public String getTopoName() {
         return topoName;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
index dea9419..8f6cbc6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
@@ -16,9 +16,6 @@
  */
 package org.apache.eagle.alert.coordinator.provider;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
 import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -29,10 +26,11 @@ import org.apache.eagle.alert.coordinator.model.TopologyUsage;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
- * @since Mar 28, 2016
- *
+ * @since Mar 28, 2016.
  */
 public class InMemScheduleConext implements IScheduleContext {
 
@@ -60,9 +58,9 @@ public class InMemScheduleConext implements IScheduleContext {
     }
 
     public InMemScheduleConext(Map<String, Topology> topologies2, Map<String, PolicyAssignment> assignments,
-            Map<String, Kafka2TupleMetadata> kafkaSources, Map<String, PolicyDefinition> policies2,
-            Map<String, Publishment> publishments2, Map<String, StreamDefinition> streamDefinitions,
-            Map<StreamGroup, MonitoredStream> monitoredStreamMap, Map<String, TopologyUsage> usages2) {
+                               Map<String, Kafka2TupleMetadata> kafkaSources, Map<String, PolicyDefinition> policies2,
+                               Map<String, Publishment> publishments2, Map<String, StreamDefinition> streamDefinitions,
+                               Map<StreamGroup, MonitoredStream> monitoredStreamMap, Map<String, TopologyUsage> usages2) {
         this.topologies = topologies2;
         this.policyAssignments = assignments;
         this.datasources = kafkaSources;