You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/09/08 07:14:17 UTC

[16/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle problems on eagle-alert module and enable failOnViolation

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java
index 42bf5d5..bf0b365 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java
@@ -16,198 +16,193 @@
  */
 package org.apache.eagle.alert.metric.reporter;
 
-import java.util.Map;
-import java.util.Properties;
-import java.util.SortedMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 import org.apache.eagle.alert.metric.entity.MetricEvent;
 import org.apache.eagle.alert.utils.ByteUtils;
+import com.codahale.metrics.*;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.Timer;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
+import java.util.Map;
+import java.util.Properties;
+import java.util.SortedMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 public class KafkaReporter extends ScheduledReporter {
-	private final static Logger LOG = LoggerFactory.getLogger(KafkaReporter.class);
-	private final String topic;
-	private final Properties properties;
-	private final Producer<byte[], String> producer;
-	private final Map<String, Object> additionalFields;
-
-	protected KafkaReporter(MetricRegistry registry, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit, String topic, Properties config, Map<String, Object> additionalFields) {
-		super(registry, "kafka-reporter", filter, rateUnit, durationUnit);
-		this.topic = topic;
-		this.properties = new Properties();
-		Preconditions.checkNotNull(topic,"topic should not be null");
-//		properties.put("bootstrap.servers", brokerList);
-//		properties.put("metadata.broker.list", brokerList);
-		properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-		properties.put("request.required.acks", "1");
-		properties.put("key.deserializer","org.apache.kafka.common.serialization.ByteArraySerializer");
-		properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
-		if(config != null) {
-			LOG.info(config.toString());
-			properties.putAll(config);
-		}
-		this.additionalFields = additionalFields;
-		this.producer = new KafkaProducer<>(properties);
-		LOG.info("Initialized kafka-reporter");
-	}
-
-	@SuppressWarnings("rawtypes")
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaReporter.class);
+    private final String topic;
+    private final Properties properties;
+    private final Producer<byte[], String> producer;
+    private final Map<String, Object> additionalFields;
+
+    protected KafkaReporter(MetricRegistry registry, MetricFilter filter, TimeUnit rateUnit, TimeUnit durationUnit, String topic, Properties config, Map<String, Object> additionalFields) {
+        super(registry, "kafka-reporter", filter, rateUnit, durationUnit);
+        this.topic = topic;
+        this.properties = new Properties();
+        Preconditions.checkNotNull(topic, "topic should not be null");
+        // properties.put("bootstrap.servers", brokerList);
+        // properties.put("metadata.broker.list", brokerList);
+        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        properties.put("request.required.acks", "1");
+        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        if (config != null) {
+            LOG.info(config.toString());
+            properties.putAll(config);
+        }
+        this.additionalFields = additionalFields;
+        this.producer = new KafkaProducer<>(properties);
+        LOG.info("Initialized kafka-reporter");
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters,
+                       SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) {
+        for (SortedMap.Entry<String, Gauge> entry : gauges.entrySet()) {
+            onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
+        }
+        for (SortedMap.Entry<String, Counter> entry : counters.entrySet()) {
+            onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
+        }
+        for (SortedMap.Entry<String, Histogram> entry : histograms.entrySet()) {
+            onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
+        }
+        for (SortedMap.Entry<String, Meter> entry : meters.entrySet()) {
+            onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
+        }
+        for (SortedMap.Entry<String, Timer> entry : timers.entrySet()) {
+            onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
+        }
+    }
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private void onMetricEvent(MetricEvent event) {
+        try {
+            if (additionalFields != null) {
+                event.putAll(additionalFields);
+            }
+            // TODO: Support configurable partition key
+            byte[] key = ByteUtils.intToBytes(event.hashCode());
+            ProducerRecord<byte[], String> record = new ProducerRecord<>(topic, key, OBJECT_MAPPER.writeValueAsString(event));
+            // TODO: Support configuration timeout
+            this.producer.send(record).get(5, TimeUnit.SECONDS);
+        } catch (JsonProcessingException e) {
+            LOG.error("Failed to serialize {} as json", event, e);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.error("Failed to produce message to topic {}", topic, e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        this.producer.close();
+        super.stop();
+    }
+
     @Override
-	public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters, SortedMap<String, Timer> timers) {
-		for(SortedMap.Entry<String, Gauge> entry:gauges.entrySet()){
-			onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
-		}
-		for(SortedMap.Entry<String, Counter> entry:counters.entrySet()){
-			onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
-		}
-		for(SortedMap.Entry<String, Histogram> entry:histograms.entrySet()){
-			onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
-		}
-		for(SortedMap.Entry<String, Meter> entry:meters.entrySet()){
-			onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
-		}
-		for(SortedMap.Entry<String, Timer> entry:timers.entrySet()){
-			onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
-		}
-	}
-
-	private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
-	private void onMetricEvent(MetricEvent event){
-		try {
-			if(additionalFields!=null){
-				event.putAll(additionalFields);
-			}
-			// TODO: Support configurable partition key
-			byte[] key = ByteUtils.intToBytes(event.hashCode());
-			ProducerRecord<byte[],String> record  = new ProducerRecord<>(topic, key, OBJECT_MAPPER.writeValueAsString(event));
-			// TODO: Support configuration timeout
-			this.producer.send(record).get(5,TimeUnit.SECONDS);
-		} catch (JsonProcessingException e) {
-			LOG.error("Failed to serialize {} as json",event,e);
-		} catch (InterruptedException | ExecutionException | TimeoutException e) {
-			LOG.error("Failed to produce message to topic {}",topic,e);
-		}
-	}
-
-	@Override
-	public void stop() {
-		this.producer.close();
-		super.stop();
-	}
-
-	@Override
-	public void close() {
-		this.producer.close();
-		super.close();
-	}
-
-	public static Builder forRegistry(MetricRegistry registry){
-		return new Builder(registry);
-	}
-
-	public static class Builder{
-		private final MetricRegistry registry;
-		private TimeUnit rateUnit;
-		private TimeUnit durationUnit;
-		private MetricFilter filter;
-		private String topic;
-		private Properties properties;
-		private Map<String, Object> additionalFields;
-
-		private Builder(MetricRegistry registry) {
-			this.registry = registry;
-			this.rateUnit = TimeUnit.SECONDS;
-			this.durationUnit = TimeUnit.MILLISECONDS;
-			this.filter = MetricFilter.ALL;
-		}
-
-		/**
-		 * Convert rates to the given time unit.
-		 *
-		 * @param rateUnit a unit of time
-		 * @return {@code this}
-		 */
-		public Builder convertRatesTo(TimeUnit rateUnit) {
-			this.rateUnit = rateUnit;
-			return this;
-		}
-
-		/**
-		 * Convert durations to the given time unit.
-		 *
-		 * @param durationUnit a unit of time
-		 * @return {@code this}
-		 */
-		public Builder convertDurationsTo(TimeUnit durationUnit) {
-			this.durationUnit = durationUnit;
-			return this;
-		}
-
-		/**
-		 * Only report metrics which match the given filter.
-		 *
-		 * @param filter a {@link MetricFilter}
-		 * @return {@code this}
-		 */
-		public Builder filter(MetricFilter filter) {
-			this.filter = filter;
-			return this;
-		}
-
-		public Builder topic(String topic){
-			this.topic = topic;
-			return this;
-		}
-
-		public Builder config(Properties properties){
-			this.properties = properties;
-			return this;
-		}
-
-		/**
-		 * Builds a {@link ConsoleReporter} with the given properties.
-		 *
-		 * @return a {@link ConsoleReporter}
-		 */
-		public KafkaReporter build() {
-			if(topic == null && properties!=null) topic = properties.getProperty("topic");
-			return new KafkaReporter(registry,filter,rateUnit,durationUnit,topic,properties,additionalFields);
-		}
-
-		@SuppressWarnings("serial")
+    public void close() {
+        this.producer.close();
+        super.close();
+    }
+
+    public static Builder forRegistry(MetricRegistry registry) {
+        return new Builder(registry);
+    }
+
+    public static class Builder {
+        private final MetricRegistry registry;
+        private TimeUnit rateUnit;
+        private TimeUnit durationUnit;
+        private MetricFilter filter;
+        private String topic;
+        private Properties properties;
+        private Map<String, Object> additionalFields;
+
+        private Builder(MetricRegistry registry) {
+            this.registry = registry;
+            this.rateUnit = TimeUnit.SECONDS;
+            this.durationUnit = TimeUnit.MILLISECONDS;
+            this.filter = MetricFilter.ALL;
+        }
+
+        /**
+         * Convert rates to the given time unit.
+         *
+         * @param rateUnit a unit of time
+         * @return {@code this}
+         */
+        public Builder convertRatesTo(TimeUnit rateUnit) {
+            this.rateUnit = rateUnit;
+            return this;
+        }
+
+        /**
+         * Convert durations to the given time unit.
+         *
+         * @param durationUnit a unit of time
+         * @return {@code this}
+         */
+        public Builder convertDurationsTo(TimeUnit durationUnit) {
+            this.durationUnit = durationUnit;
+            return this;
+        }
+
+        /**
+         * Only report metrics which match the given filter.
+         *
+         * @param filter a {@link MetricFilter}
+         * @return {@code this}
+         */
+        public Builder filter(MetricFilter filter) {
+            this.filter = filter;
+            return this;
+        }
+
+        public Builder topic(String topic) {
+            this.topic = topic;
+            return this;
+        }
+
+        /**
+         * Builds a {@link ConsoleReporter} with the given properties.
+         *
+         * @return a {@link ConsoleReporter}
+         */
+        public KafkaReporter build() {
+            if (topic == null && properties != null) {
+                topic = properties.getProperty("topic");
+            }
+            return new KafkaReporter(registry, filter, rateUnit, durationUnit, topic, properties, additionalFields);
+        }
+
+        @SuppressWarnings("serial")
         public Builder config(Config config) {
-			this.config(new Properties(){{
-				putAll(config.root().unwrapped());
-			}});
-			return this;
-		}
-
-		public Builder addFields(Map<String, Object> tags) {
-			this.additionalFields = tags;
-			return this;
-		}
-	}
+            this.config(new Properties() {
+                {
+                    putAll(config.root().unwrapped());
+                }
+            });
+            return this;
+        }
+
+        public Builder config(Properties properties) {
+            this.properties = properties;
+            return this;
+        }
+
+        public Builder addFields(Map<String, Object> tags) {
+            this.additionalFields = tags;
+            return this;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java
index fd6cc41..6ded685 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java
@@ -1,12 +1,4 @@
-package org.apache.eagle.alert.metric.sink;
-
-import java.util.concurrent.TimeUnit;
-
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.Config;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,15 +14,24 @@ import com.typesafe.config.Config;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+package org.apache.eagle.alert.metric.sink;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+import java.util.concurrent.TimeUnit;
+
 public class ConsoleSink implements MetricSink {
     private ConsoleReporter reporter;
+
     @Override
     public void prepare(Config config, MetricRegistry registry) {
         reporter = ConsoleReporter.forRegistry(registry).build();
     }
 
     @Override
-    public void start(long period,TimeUnit unit) {
+    public void start(long period, TimeUnit unit) {
         reporter.start(period, unit);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java
index 7e30b82..4de98cf 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java
@@ -16,43 +16,42 @@
  */
 package org.apache.eagle.alert.metric.sink;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
 import org.elasticsearch.metrics.ElasticsearchReporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 public class ElasticSearchSink implements MetricSink {
     private ElasticsearchReporter reporter = null;
-    private final static Logger LOG = LoggerFactory.getLogger(ElasticSearchSink.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchSink.class);
 
     @Override
     public void prepare(Config config, MetricRegistry registry) {
         LOG.debug("Preparing elasticsearch-sink");
         try {
             ElasticsearchReporter.Builder builder = ElasticsearchReporter.forRegistry(registry);
-            if(config.hasPath("hosts")){
+            if (config.hasPath("hosts")) {
                 List<String> hosts = config.getStringList("hosts");
                 builder.hosts(hosts.toArray(new String[hosts.size()]));
             }
-            if(config.hasPath("index")){
+            if (config.hasPath("index")) {
                 builder.index(config.getString("index"));
             }
             builder.indexDateFormat("yyyy-MM-dd");
-            builder.timestampFieldname(config.hasPath("timestampField")?config.getString("timestampField"):"@timestamp");
+            builder.timestampFieldname(config.hasPath("timestampField") ? config.getString("timestampField") : "@timestamp");
 
-            if(config.hasPath("tags")) {
+            if (config.hasPath("tags")) {
                 builder.additionalFields(config.getConfig("tags").root().unwrapped());
             }
 
             reporter = builder.build();
         } catch (IOException e) {
-            LOG.error(e.getMessage(),e);
+            LOG.error(e.getMessage(), e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/JmxSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/JmxSink.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/JmxSink.java
index fddaf19..2d3ed93 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/JmxSink.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/JmxSink.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,11 +16,10 @@
  */
 package org.apache.eagle.alert.metric.sink;
 
-import java.util.concurrent.TimeUnit;
-
 import com.codahale.metrics.JmxReporter;
 import com.codahale.metrics.MetricRegistry;
 import com.typesafe.config.Config;
+import java.util.concurrent.TimeUnit;
 
 public class JmxSink implements MetricSink {
     private JmxReporter reporter;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java
index 6ff000c..88b22c5 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java
@@ -1,15 +1,4 @@
-package org.apache.eagle.alert.metric.sink;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.eagle.alert.metric.reporter.KafkaReporter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.Config;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -25,17 +14,28 @@ import com.typesafe.config.Config;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-public class KafkaSink implements MetricSink{
+
+package org.apache.eagle.alert.metric.sink;
+
+import org.apache.eagle.alert.metric.reporter.KafkaReporter;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
+
+public class KafkaSink implements MetricSink {
     private KafkaReporter reporter;
-    private final static Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
+
     @Override
     public void prepare(Config config, MetricRegistry registry) {
         LOG.debug("Preparing kafka-sink");
         KafkaReporter.Builder builder = KafkaReporter.forRegistry(registry)
-                .topic(config.getString("topic"))
-                .config(config);
+            .topic(config.getString("topic"))
+            .config(config);
 
-        if(config.hasPath("tags")){
+        if (config.hasPath("tags")) {
             builder.addFields(config.getConfig("tags").root().unwrapped());
         }
 
@@ -46,7 +46,7 @@ public class KafkaSink implements MetricSink{
     @Override
     public void start(long period, TimeUnit unit) {
         LOG.info("Starting");
-        reporter.start(period,unit);
+        reporter.start(period, unit);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSink.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSink.java
index b09eda3..2030d8e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSink.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSink.java
@@ -16,15 +16,16 @@
  */
 package org.apache.eagle.alert.metric.sink;
 
-
-import java.util.concurrent.TimeUnit;
-
 import com.codahale.metrics.MetricRegistry;
 import com.typesafe.config.Config;
+import java.util.concurrent.TimeUnit;
 
 public interface MetricSink {
     void prepare(Config config, MetricRegistry registry);
-    void start(long period,TimeUnit unit);
+
+    void start(long period, TimeUnit unit);
+
     void stop();
+
     void report();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSinkRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSinkRepository.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSinkRepository.java
index b4126f2..70d7331 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSinkRepository.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/MetricSinkRepository.java
@@ -1,9 +1,4 @@
-package org.apache.eagle.alert.metric.sink;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -19,16 +14,21 @@ import java.util.Map;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.metric.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+
 public class MetricSinkRepository {
-    private final static Map<String,Class<? extends MetricSink>> sinkTypeClassMapping = new HashMap<>();
+    private static final Map<String, Class<? extends MetricSink>> sinkTypeClassMapping = new HashMap<>();
 
-    public static void register(String sinkType,Class<? extends MetricSink> sinkClass){
-        sinkTypeClassMapping.put(sinkType,sinkClass);
+    public static void register(String sinkType, Class<? extends MetricSink> sinkClass) {
+        sinkTypeClassMapping.put(sinkType, sinkClass);
     }
 
-    public static MetricSink createSink(String sinkType){
+    public static MetricSink createSink(String sinkType) {
         if (!sinkTypeClassMapping.containsKey(sinkType)) {
-            throw new IllegalArgumentException("Unknown sink type: "+sinkType);
+            throw new IllegalArgumentException("Unknown sink type: " + sinkType);
         }
         try {
             return sinkTypeClassMapping.get(sinkType).newInstance();
@@ -38,10 +38,10 @@ public class MetricSinkRepository {
     }
 
     static {
-        register("kafka",KafkaSink.class);
-        register("jmx",JmxSink.class);
-        register("elasticsearch",ElasticSearchSink.class);
-        register("stdout",ConsoleSink.class);
-        register("logger",Slf4jSink.class);
+        register("kafka", KafkaSink.class);
+        register("jmx", JmxSink.class);
+        register("elasticsearch", ElasticSearchSink.class);
+        register("stdout", ConsoleSink.class);
+        register("logger", Slf4jSink.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/Slf4jSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/Slf4jSink.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/Slf4jSink.java
index ce465fa..c25c835 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/Slf4jSink.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/Slf4jSink.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,51 +14,53 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.eagle.alert.metric.sink;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.LoggerFactory;
+package org.apache.eagle.alert.metric.sink;
 
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Slf4jReporter;
 import com.typesafe.config.Config;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 public class Slf4jSink implements MetricSink {
     private Slf4jReporter reporter;
 
     @SuppressWarnings("serial")
-    private final static Map<String,Slf4jReporter.LoggingLevel> LEVEL_MAPPING = new HashMap<String,Slf4jReporter.LoggingLevel>(){{
-        put("INFO",Slf4jReporter.LoggingLevel.INFO);
-        put("DEBUG",Slf4jReporter.LoggingLevel.DEBUG);
-        put("ERROR",Slf4jReporter.LoggingLevel.ERROR);
-        put("TRACE",Slf4jReporter.LoggingLevel.TRACE);
-        put("WARN",Slf4jReporter.LoggingLevel.WARN);
-    }};
+    private static final Map<String, Slf4jReporter.LoggingLevel> LEVEL_MAPPING = new HashMap<String, Slf4jReporter.LoggingLevel>() {
+        {
+            put("INFO", Slf4jReporter.LoggingLevel.INFO);
+            put("DEBUG", Slf4jReporter.LoggingLevel.DEBUG);
+            put("ERROR", Slf4jReporter.LoggingLevel.ERROR);
+            put("TRACE", Slf4jReporter.LoggingLevel.TRACE);
+            put("WARN", Slf4jReporter.LoggingLevel.WARN);
+        }
+    };
 
-    private static Slf4jReporter.LoggingLevel getLoggingLevel(String level){
-        if(LEVEL_MAPPING.containsKey(level.toUpperCase())){
+    private static Slf4jReporter.LoggingLevel getLoggingLevel(String level) {
+        if (LEVEL_MAPPING.containsKey(level.toUpperCase())) {
             return LEVEL_MAPPING.get(level.toUpperCase());
-        } else{
-            throw new IllegalArgumentException("Illegal logging level: "+level);
+        } else {
+            throw new IllegalArgumentException("Illegal logging level: " + level);
         }
     }
 
     @Override
     public void prepare(Config config, MetricRegistry registry) {
         reporter = Slf4jReporter.forRegistry(registry)
-                .outputTo(LoggerFactory.getLogger("org.apache.eagle.alert.metric"))
-                .withLoggingLevel(config.hasPath("level")? getLoggingLevel(config.getString("level")): Slf4jReporter.LoggingLevel.INFO)
-                .convertRatesTo(TimeUnit.SECONDS)
-                .convertDurationsTo(TimeUnit.MILLISECONDS)
-                .build();
+            .outputTo(LoggerFactory.getLogger("org.apache.eagle.alert.metric"))
+            .withLoggingLevel(config.hasPath("level") ? getLoggingLevel(config.getString("level")) : Slf4jReporter.LoggingLevel.INFO)
+            .convertRatesTo(TimeUnit.SECONDS)
+            .convertDurationsTo(TimeUnit.MILLISECONDS)
+            .build();
     }
 
     @Override
-    public void start(long period,TimeUnit unit) {
-        reporter.start(period,unit);
+    public void start(long period, TimeUnit unit) {
+        reporter.start(period, unit);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/JVMMetricSource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/JVMMetricSource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/JVMMetricSource.java
index a5bb5f4..8261a25 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/JVMMetricSource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/JVMMetricSource.java
@@ -1,10 +1,4 @@
-package org.apache.eagle.alert.metric.source;
-
-import com.codahale.metrics.JvmAttributeGaugeSet;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,11 +14,17 @@ import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-public class JVMMetricSource implements MetricSource{
+package org.apache.eagle.alert.metric.source;
+
+import com.codahale.metrics.JvmAttributeGaugeSet;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+
+public class JVMMetricSource implements MetricSource {
 
     private MetricRegistry registry = new MetricRegistry();
 
-    public JVMMetricSource(){
+    public JVMMetricSource() {
         registry.registerAll(new JvmAttributeGaugeSet());
         registry.registerAll(new MemoryUsageGaugeSet());
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSource.java
index 59b7a02..180fa97 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSource.java
@@ -1,8 +1,4 @@
-package org.apache.eagle.alert.metric.source;
-
-import com.codahale.metrics.MetricRegistry;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -18,7 +14,12 @@ import com.codahale.metrics.MetricRegistry;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.metric.source;
+
+import com.codahale.metrics.MetricRegistry;
+
 public interface MetricSource {
     String name();
+
     MetricRegistry registry();
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSourceWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSourceWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSourceWrapper.java
index 137fc17..d83576c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSourceWrapper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/source/MetricSourceWrapper.java
@@ -1,8 +1,4 @@
-package org.apache.eagle.alert.metric.source;
-
-import com.codahale.metrics.MetricRegistry;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -18,11 +14,15 @@ import com.codahale.metrics.MetricRegistry;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.metric.source;
+
+import com.codahale.metrics.MetricRegistry;
+
 public class MetricSourceWrapper implements MetricSource {
     private final MetricRegistry registry;
     private final String name;
 
-    public MetricSourceWrapper(String name, MetricRegistry registry){
+    public MetricSourceWrapper(String name, MetricRegistry registry) {
         this.name = name;
         this.registry = registry;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/resource/SimpleCORSFiler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/resource/SimpleCORSFiler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/resource/SimpleCORSFiler.java
index addab44..2e1c72c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/resource/SimpleCORSFiler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/resource/SimpleCORSFiler.java
@@ -29,9 +29,8 @@ import javax.servlet.http.HttpServletResponse;
 /**
  * A simple allow all CORS filter that works with swagger UI. Tomcat CORS filter
  * doesn't support Origin: null case which is the swagger UI request.
- * 
- * @since Apr 15, 2016
  *
+ * @since Apr 15, 2016
  */
 public class SimpleCORSFiler implements Filter {
 
@@ -42,13 +41,13 @@ public class SimpleCORSFiler implements Filter {
 
     @Override
     public void doFilter(ServletRequest request, ServletResponse res, FilterChain chain) throws IOException,
-            ServletException {
+        ServletException {
         HttpServletResponse response = (HttpServletResponse) res;
         response.setHeader("Access-Control-Allow-Origin", "*");
         response.setHeader("Access-Control-Allow-Methods", "HEAD, POST, GET, OPTIONS, DELETE");
         response.setHeader("Access-Control-Max-Age", "3600");
         response.setHeader("Access-Control-Allow-Headers",
-                "Origin, Accept, X-Requested-With, Content-Type, Access-Control-Request-Method, Access-Control-Request-Headers");
+            "Origin, Accept, X-Requested-With, Content-Type, Access-Control-Request-Method, Access-Control-Request-Headers");
         chain.doFilter(request, response);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java
index 80cb169..2d1072d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/IMetadataServiceClient.java
@@ -19,10 +19,6 @@
 
 package org.apache.eagle.alert.service;
 
-import java.io.Closeable;
-import java.io.Serializable;
-import java.util.List;
-
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordination.model.SpoutSpec;
@@ -31,45 +27,62 @@ import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.List;
 
 /**
- * service stub to get metadata from remote metadata service
+ * service stub to get metadata from remote metadata service.
  */
 public interface IMetadataServiceClient extends Closeable, Serializable {
 
     // user metadta
     void addStreamingCluster(StreamingCluster cluster);
+
     void addStreamingClusters(List<StreamingCluster> clusters);
+
     List<StreamingCluster> listClusters();
-    
+
     List<Topology> listTopologies();
+
     void addTopology(Topology t);
+
     void addTopologies(List<Topology> topologies);
 
     void addPolicy(PolicyDefinition policy);
+
     void addPolicies(List<PolicyDefinition> policies);
+
     List<PolicyDefinition> listPolicies();
 
     void addStreamDefinition(StreamDefinition streamDef);
+
     void addStreamDefinitions(List<StreamDefinition> streamDefs);
+
     List<StreamDefinition> listStreams();
 
     void addDataSource(Kafka2TupleMetadata k2t);
+
     void addDataSources(List<Kafka2TupleMetadata> k2ts);
+
     List<Kafka2TupleMetadata> listDataSources();
 
     void addPublishment(Publishment pub);
+
     void addPublishments(List<Publishment> pubs);
+
     List<Publishment> listPublishment();
 
     // monitor metadata
     List<SpoutSpec> listSpoutMetadata();
 
     ScheduleState getVersionedSpec();
+
     ScheduleState getVersionedSpec(String version);
+
     void addScheduleState(ScheduleState state);
-    
+
     void clear();
-    
+
     // for topology mgmt
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
index 2ddba55..8178824 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
@@ -19,13 +19,6 @@
 
 package org.apache.eagle.alert.service;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.ws.rs.core.MediaType;
-
-import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordination.model.SpoutSpec;
@@ -34,9 +27,8 @@ import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.GenericType;
@@ -45,6 +37,13 @@ import com.sun.jersey.api.client.config.ClientConfig;
 import com.sun.jersey.api.client.config.DefaultClientConfig;
 import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
 import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.ws.rs.core.MediaType;
 
 public class MetadataServiceClientImpl implements IMetadataServiceClient {
     private static final long serialVersionUID = 3003976065082684128L;
@@ -82,7 +81,7 @@ public class MetadataServiceClientImpl implements IMetadataServiceClient {
 
     public MetadataServiceClientImpl(Config config) {
         this(config.getString(EAGLE_CORRELATION_SERVICE_HOST), config.getInt(EAGLE_CORRELATION_SERVICE_PORT), config
-                .getString(EAGLE_CORRELATION_CONTEXT));
+            .getString(EAGLE_CORRELATION_CONTEXT));
         basePath = buildBasePath();
     }
 
@@ -163,30 +162,30 @@ public class MetadataServiceClientImpl implements IMetadataServiceClient {
         return listOne(METADATA_SCHEDULESTATES_PATH + "/" + version, ScheduleState.class);
     }
 
+    @Override
+    public ScheduleState getVersionedSpec() {
+        return listOne(METADATA_SCHEDULESTATES_PATH, ScheduleState.class);
+    }
+
     private <T> T listOne(String path, Class<T> tClz) {
         LOG.info("query URL {}", basePath + path);
         WebResource r = client.resource(basePath + path);
 
         ClientResponse resp = r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON)
-                .get(ClientResponse.class);
+            .get(ClientResponse.class);
         if (resp.getStatus() < 300) {
             try {
                 return resp.getEntity(tClz);
             } catch (Exception e) {
                 LOG.warn(" list one entity failed, ignored and continute, path {}, message {}!", path, e.getMessage());
             }
-        }else{
+        } else {
             LOG.warn("fail querying metadata service {} with http status {}", basePath + path, resp.getStatus());
         }
         return null;
     }
 
     @Override
-    public ScheduleState getVersionedSpec() {
-        return listOne(METADATA_SCHEDULESTATES_PATH, ScheduleState.class);
-    }
-
-    @Override
     public void addScheduleState(ScheduleState state) {
         WebResource r = client.resource(basePath + METADATA_SCHEDULESTATES_PATH);
         r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(state);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
index 87c45d8..2cbc977 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
@@ -1,6 +1,4 @@
-package org.apache.eagle.alert.utils;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,12 +14,14 @@ package org.apache.eagle.alert.utils;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.utils;
+
 public class AlertConstants {
-    public final static String FIELD_0 = "f0";
-    public final static String FIELD_1 = "f1";
-    public final static String FIELD_2 = "f2";
-    public final static String FIELD_3 = "f3";
+    public static final String FIELD_0 = "f0";
+    public static final String FIELD_1 = "f1";
+    public static final String FIELD_2 = "f2";
+    public static final String FIELD_3 = "f3";
 
-    public final static String DEFAULT_SPOUT_NAME = "alertEngineSpout";
-    public final static String DEFAULT_ROUTERBOLT_NAME = "streamRouterBolt";
+    public static final String DEFAULT_SPOUT_NAME = "alertEngineSpout";
+    public static final String DEFAULT_ROUTERBOLT_NAME = "streamRouterBolt";
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ByteUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ByteUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ByteUtils.java
index 2e5a8fe..53fc4ac 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ByteUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ByteUtils.java
@@ -20,134 +20,134 @@ import java.io.UnsupportedEncodingException;
 
 public class ByteUtils {
 
-	public static double bytesToDouble(byte[] bytes, int offset){
-		return Double.longBitsToDouble(bytesToLong(bytes, offset));
-	}
-	
-	public static double bytesToDouble(byte[] bytes){
-		return Double.longBitsToDouble(bytesToLong(bytes));
-	}
-	
-	public static void doubleToBytes(double v, byte[] bytes){
-		doubleToBytes(v, bytes, 0);
-	}
-	
-	public static void doubleToBytes(double v, byte[] bytes, int offset){
-		longToBytes(Double.doubleToLongBits(v), bytes, offset);
-	}
-	
-	public static byte[] doubleToBytes(double v){
-		return longToBytes(Double.doubleToLongBits(v));
-	}
-	
-	public static long bytesToLong(byte[] bytes){
-		return bytesToLong(bytes, 0);
-	}
-	
-	public static long bytesToLong(byte[] bytes, int offset){
-		long value = 0;
-		for(int i=0; i<8; i++){
-			value <<= 8;
-			value |= (bytes[i+offset] & 0xFF);
-		}
-		return value;
-	}
-	
-	public static void longToBytes(long v, byte[] bytes){
-		longToBytes(v, bytes, 0);
-	}
-	
-	public static void longToBytes(long v, byte[] bytes, int offset){
-		long tmp = v;
-		for(int i=0; i<8; i++){
-			bytes[offset + 7 - i] = (byte)(tmp & 0xFF);
-			tmp >>= 8;
-		}
-	}
-	
-	public static byte[] longToBytes(long v){
-		long tmp = v;
-		byte[] b = new byte[8];
-		for(int i=0; i<8; i++){
-			b[7-i] = (byte)(tmp & 0xFF);
-			tmp >>= 8;
-		}
-		return b;
-	}
-	
-	public static int bytesToInt(byte[] bytes){
-		return bytesToInt(bytes, 0);
-	}
-	
-	public static int bytesToInt(byte[] bytes, int offset){
-		int value = 0;
-		for(int i=0; i<4; i++){
-			value <<= 8;
-			value |= (bytes[i+offset] & 0xFF);
-		}
-		return value;
-	}
-	
-	public static void intToBytes(int v, byte[] bytes){
-		intToBytes(v, bytes, 0);
-	}
-	
-	public static void intToBytes(int v, byte[] bytes, int offset){
-		int tmp = v;
-		for(int i=0; i<4; i++){
-			bytes[offset + 3 - i] = (byte)(tmp & 0xFF);
-			tmp >>= 8;
-		}
-	}
-
-	public static byte[] intToBytes(int v){
-		int tmp = v;
-		byte[] b = new byte[4];
-		for(int i=0; i<4; i++){
-			b[3-i] = (byte)(tmp & 0xFF);
-			tmp >>= 8;
-		}
-		return b;
-	}
-
-	//////
-	
-	public static short bytesToShort(byte[] bytes){
-		return bytesToShort(bytes, 0);
-	}
-	
-	public static short bytesToShort(byte[] bytes, int offset){
-		short value = 0;
-		for(int i=0; i < 2; i++){
-			value <<= 8;
-			value |= (bytes[i+offset] & 0xFF);
-		}
-		return value;
-	}
-	
-	public static void shortToBytes(short v, byte[] bytes){
-		shortToBytes(v, bytes, 0);
-	}
-	
-	public static void shortToBytes(short v, byte[] bytes, int offset){
-		int tmp = v;
-		for(int i=0; i < 2; i++){
-			bytes[offset + 1 - i] = (byte)(tmp & 0xFF);
-			tmp >>= 8;
-		}
-	}
-
-	public static byte[] shortToBytes(short v){
-		int tmp = v;
-		byte[] b = new byte[2];
-		for(int i=0; i<2; i++){
-			b[1-i] = (byte)(tmp & 0xFF);
-			tmp >>= 8;
-		}
-		return b;
-	}
-
-	public static byte[] concat(byte[]... arrays) {
+    public static double bytesToDouble(byte[] bytes, int offset) {
+        return Double.longBitsToDouble(bytesToLong(bytes, offset));
+    }
+
+    public static double bytesToDouble(byte[] bytes) {
+        return Double.longBitsToDouble(bytesToLong(bytes));
+    }
+
+    public static void doubleToBytes(double v, byte[] bytes) {
+        doubleToBytes(v, bytes, 0);
+    }
+
+    public static void doubleToBytes(double v, byte[] bytes, int offset) {
+        longToBytes(Double.doubleToLongBits(v), bytes, offset);
+    }
+
+    public static byte[] doubleToBytes(double v) {
+        return longToBytes(Double.doubleToLongBits(v));
+    }
+
+    public static long bytesToLong(byte[] bytes) {
+        return bytesToLong(bytes, 0);
+    }
+
+    public static long bytesToLong(byte[] bytes, int offset) {
+        long value = 0;
+        for (int i = 0; i < 8; i++) {
+            value <<= 8;
+            value |= (bytes[i + offset] & 0xFF);
+        }
+        return value;
+    }
+
+    public static void longToBytes(long v, byte[] bytes) {
+        longToBytes(v, bytes, 0);
+    }
+
+    public static void longToBytes(long v, byte[] bytes, int offset) {
+        long tmp = v;
+        for (int i = 0; i < 8; i++) {
+            bytes[offset + 7 - i] = (byte) (tmp & 0xFF);
+            tmp >>= 8;
+        }
+    }
+
+    public static byte[] longToBytes(long v) {
+        long tmp = v;
+        byte[] b = new byte[8];
+        for (int i = 0; i < 8; i++) {
+            b[7 - i] = (byte) (tmp & 0xFF);
+            tmp >>= 8;
+        }
+        return b;
+    }
+
+    public static int bytesToInt(byte[] bytes) {
+        return bytesToInt(bytes, 0);
+    }
+
+    public static int bytesToInt(byte[] bytes, int offset) {
+        int value = 0;
+        for (int i = 0; i < 4; i++) {
+            value <<= 8;
+            value |= (bytes[i + offset] & 0xFF);
+        }
+        return value;
+    }
+
+    public static void intToBytes(int v, byte[] bytes) {
+        intToBytes(v, bytes, 0);
+    }
+
+    public static void intToBytes(int v, byte[] bytes, int offset) {
+        int tmp = v;
+        for (int i = 0; i < 4; i++) {
+            bytes[offset + 3 - i] = (byte) (tmp & 0xFF);
+            tmp >>= 8;
+        }
+    }
+
+    public static byte[] intToBytes(int v) {
+        int tmp = v;
+        byte[] b = new byte[4];
+        for (int i = 0; i < 4; i++) {
+            b[3 - i] = (byte) (tmp & 0xFF);
+            tmp >>= 8;
+        }
+        return b;
+    }
+
+    //////
+
+    public static short bytesToShort(byte[] bytes) {
+        return bytesToShort(bytes, 0);
+    }
+
+    public static short bytesToShort(byte[] bytes, int offset) {
+        short value = 0;
+        for (int i = 0; i < 2; i++) {
+            value <<= 8;
+            value |= (bytes[i + offset] & 0xFF);
+        }
+        return value;
+    }
+
+    public static void shortToBytes(short v, byte[] bytes) {
+        shortToBytes(v, bytes, 0);
+    }
+
+    public static void shortToBytes(short v, byte[] bytes, int offset) {
+        int tmp = v;
+        for (int i = 0; i < 2; i++) {
+            bytes[offset + 1 - i] = (byte) (tmp & 0xFF);
+            tmp >>= 8;
+        }
+    }
+
+    public static byte[] shortToBytes(short v) {
+        int tmp = v;
+        byte[] b = new byte[2];
+        for (int i = 0; i < 2; i++) {
+            b[1 - i] = (byte) (tmp & 0xFF);
+            tmp >>= 8;
+        }
+        return b;
+    }
+
+    public static byte[] concat(byte[]... arrays) {
         int length = 0;
         for (byte[] array : arrays) {
             length += array.length;
@@ -161,28 +161,28 @@ public class ByteUtils {
         return result;
     }
 
-	public static byte[] stringToBytes(String str) {
-		try {
-			return str.getBytes("UTF-8");
-		} catch (UnsupportedEncodingException e) {
-			throw new IllegalStateException(e);
-		}
-	}
-
-//    public static void main(String[] args){ 
-//    	int a = "ThreadName".hashCode();
-//    	byte[] b = intToBytes(a);
-//    	byte[] c = intToBytes(1676687583);
-//    	String s = new String(b);
-//    	System.out.println(s);
-    	
-//    	byte[] d = intToBytes(8652353);
-//    	System.out.println(bytesToInt(d));
-    	
-//    	byte[] e = longToBytes(12131513513l);
-//    	System.out.println(bytesToLong(e));
-//    	if(12131513513l == bytesToLong(e)){
-//    		System.out.println("yes");
-//    	}
-//    }
+    public static byte[] stringToBytes(String str) {
+        try {
+            return str.getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    //    public static void main(String[] args){ 
+    //      int a = "ThreadName".hashCode();
+    //      byte[] b = intToBytes(a);
+    //      byte[] c = intToBytes(1676687583);
+    //      String s = new String(b);
+    //      System.out.println(s);
+    
+    //      byte[] d = intToBytes(8652353);
+    //      System.out.println(bytesToInt(d));
+    
+    //      byte[] e = longToBytes(12131513513l);
+    //      System.out.println(bytesToLong(e));
+    //      if(12131513513l == bytesToLong(e)){
+    //        System.out.println("yes");
+    //      }
+    //    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ConfigUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ConfigUtils.java
index 685265f..e9634e2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ConfigUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ConfigUtils.java
@@ -1,10 +1,4 @@
-package org.apache.eagle.alert.utils;
-
-import java.util.Properties;
-
-import com.typesafe.config.Config;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,12 +14,19 @@ import com.typesafe.config.Config;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.utils;
+
+import com.typesafe.config.Config;
+import java.util.Properties;
+
 public class ConfigUtils {
 
     @SuppressWarnings("serial")
-    public static Properties toProperties(Config config){
-        return new Properties(){{
-            putAll(config.root().unwrapped());
-        }};
+    public static Properties toProperties(Config config) {
+        return new Properties() {
+            {
+                putAll(config.root().unwrapped());
+            }
+        };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/DateTimeUtil.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/DateTimeUtil.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/DateTimeUtil.java
index d611b95..8b98959 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/DateTimeUtil.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/DateTimeUtil.java
@@ -24,118 +24,119 @@ import java.util.TimeZone;
 
 /**
  * be aware that SimpleDateFormat instantiation is expensive, so if that's under a tight loop, probably we need
- * a thread local SimpleDateFormat object
+ * a thread local SimpleDateFormat object.
  */
 public class DateTimeUtil {
-	public static final long ONESECOND = 1L * 1000L;
-	public static final long ONEMINUTE = 1L * 60L * 1000L;
-	public static final long ONEHOUR = 1L * 60L * 60L * 1000L;
-	public static final long ONEDAY = 24L * 60L * 60L * 1000L;
+    public static final long ONESECOND = 1L * 1000L;
+    public static final long ONEMINUTE = 1L * 60L * 1000L;
+    public static final long ONEHOUR = 1L * 60L * 60L * 1000L;
+    public static final long ONEDAY = 24L * 60L * 60L * 1000L;
     private static TimeZone CURRENT_TIME_ZONE = TimeZone.getDefault();
 
-	public static Date humanDateToDate(String date) throws ParseException {
-		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    public static Date humanDateToDate(String date) throws ParseException {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         sdf.setTimeZone(CURRENT_TIME_ZONE);
-		return sdf.parse(date);
-	}
+        return sdf.parse(date);
+    }
 
-	public static long getCurrentTimestamp(){
-		return System.currentTimeMillis();
-	}
-	
-	public static String secondsToHumanDate(long seconds){
-		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    public static long getCurrentTimestamp() {
+        return System.currentTimeMillis();
+    }
+
+    public static String secondsToHumanDate(long seconds) {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         sdf.setTimeZone(CURRENT_TIME_ZONE);
-		Date t = new Date();
-		t.setTime(seconds*1000);
-		return sdf.format(t);
-	}
-	
-	public static String millisecondsToHumanDateWithMilliseconds(long milliseconds){
-		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+        Date t = new Date();
+        t.setTime(seconds * 1000);
+        return sdf.format(t);
+    }
+
+    public static String millisecondsToHumanDateWithMilliseconds(long milliseconds) {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
         sdf.setTimeZone(CURRENT_TIME_ZONE);
-		Date t = new Date();
-		t.setTime(milliseconds);
-		return sdf.format(t);
-	}
-	
-	public static String millisecondsToHumanDateWithSeconds(long milliseconds){
-		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        Date t = new Date();
+        t.setTime(milliseconds);
+        return sdf.format(t);
+    }
+
+    public static String millisecondsToHumanDateWithSeconds(long milliseconds) {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         sdf.setTimeZone(CURRENT_TIME_ZONE);
-		Date t = new Date();
-		t.setTime(milliseconds);
-		return sdf.format(t);
-	}
-	
-	public static long humanDateToSeconds(String date) throws ParseException {
-		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        Date t = new Date();
+        t.setTime(milliseconds);
+        return sdf.format(t);
+    }
+
+    public static long humanDateToSeconds(String date) throws ParseException {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         sdf.setTimeZone(CURRENT_TIME_ZONE);
-		Date d = sdf.parse(date);
-		return d.getTime()/1000;
-	}
-	
-	public static long humanDateToMilliseconds(String date) throws ParseException {
-		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+        Date d = sdf.parse(date);
+        return d.getTime() / 1000;
+    }
+
+    public static long humanDateToMilliseconds(String date) throws ParseException {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
         sdf.setTimeZone(CURRENT_TIME_ZONE);
-		Date d = sdf.parse(date);
-		return d.getTime();
-	}
-	
-	
-	public static long humanDateToMillisecondsWithoutException(String date){
-		try{
-			SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+        Date d = sdf.parse(date);
+        return d.getTime();
+    }
+
+
+    public static long humanDateToMillisecondsWithoutException(String date) {
+        try {
+            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
             sdf.setTimeZone(CURRENT_TIME_ZONE);
-			Date d = sdf.parse(date);
-			return d.getTime();
-		}catch(ParseException ex){
-			return 0L;
-		}
-	}
-	
-	public static long humanDateToSecondsWithoutException(String date){
-		try{
-			SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+            Date d = sdf.parse(date);
+            return d.getTime();
+        } catch (ParseException ex) {
+            return 0L;
+        }
+    }
+
+    public static long humanDateToSecondsWithoutException(String date) {
+        try {
+            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
             sdf.setTimeZone(CURRENT_TIME_ZONE);
-			Date d = sdf.parse(date);
-			return (d.getTime() / 1000);
-		}catch(ParseException ex){
-			return 0L;
-		}
-	}
-	/**
-	 * this could be accurate only when timezone is UTC
-	 * for the timezones other than UTC, there is possibly issue, for example
-	 * assume timezone is GMT+8 in China
-	 * When user time is "2014-07-15 05:00:00", it will be converted to timestamp first, internally it would be  "2014-07-14 21:00:00" in UTC timezone. When rounded down to day, the internal time would 
-	 * be changed to "2014-07-14 00:00:00", and that means the user time is "2014-07-14 08:00:00". But originally user wants to round it to "2014-07-15 00:00:00"
-	 * 
-	 * @param field
-	 * @param timeInMillis the seconds elapsed since 1970-01-01 00:00:00
-	 * @return
-	 */
-	public static long roundDown(int field, long timeInMillis){
-		switch(field){
-			case Calendar.DAY_OF_MONTH:
-			case Calendar.DAY_OF_WEEK:
-			case Calendar.DAY_OF_YEAR:
-				return (timeInMillis - timeInMillis % (24*60*60*1000));
-			case Calendar.HOUR:
-				return (timeInMillis - timeInMillis % (60*60*1000));
-			case Calendar.MINUTE:
-				return (timeInMillis - timeInMillis % (60*1000));
-			case Calendar.SECOND:
-				return (timeInMillis - timeInMillis % (1000));
-			default:
-				return 0L;
-		}
-	}
+            Date d = sdf.parse(date);
+            return (d.getTime() / 1000);
+        } catch (ParseException ex) {
+            return 0L;
+        }
+    }
+
+    /**
+     * this could be accurate only when timezone is UTC
+     * for the timezones other than UTC, there is possibly issue, for example
+     * assume timezone is GMT+8 in China
+     * When user time is "2014-07-15 05:00:00", it will be converted to timestamp first,
+     * internally it would be  "2014-07-14 21:00:00" in UTC timezone. When rounded down to day, the internal time would
+     * be changed to "2014-07-14 00:00:00", and that means the user time is "2014-07-14 08:00:00".
+     * But originally user wants to round it to "2014-07-15 00:00:00"
+     *
+     * @param timeInMillis the seconds elapsed since 1970-01-01 00:00:00
+     */
+    public static long roundDown(int field, long timeInMillis) {
+        switch (field) {
+            case Calendar.DAY_OF_MONTH:
+            case Calendar.DAY_OF_WEEK:
+            case Calendar.DAY_OF_YEAR:
+                return (timeInMillis - timeInMillis % (24 * 60 * 60 * 1000));
+            case Calendar.HOUR:
+                return (timeInMillis - timeInMillis % (60 * 60 * 1000));
+            case Calendar.MINUTE:
+                return (timeInMillis - timeInMillis % (60 * 1000));
+            case Calendar.SECOND:
+                return (timeInMillis - timeInMillis % (1000));
+            default:
+                return 0L;
+        }
+    }
 
-	public static String format(long milliseconds, String format) {
-		SimpleDateFormat sdf = new SimpleDateFormat(format);
+    public static String format(long milliseconds, String format) {
+        SimpleDateFormat sdf = new SimpleDateFormat(format);
         sdf.setTimeZone(CURRENT_TIME_ZONE);
-		Date t = new Date();
-		t.setTime(milliseconds);
-		return sdf.format(t);
-	}
+        Date t = new Date();
+        t.setTime(milliseconds);
+        return sdf.format(t);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/HostUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/HostUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/HostUtils.java
index 556a12c..e3f72e2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/HostUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/HostUtils.java
@@ -26,7 +26,7 @@ import java.net.UnknownHostException;
 import java.util.Enumeration;
 
 /**
- * http://stackoverflow.com/questions/7348711/recommended-way-to-get-hostname-in-java
+ * http://stackoverflow.com/questions/7348711/recommended-way-to-get-hostname-in-java.
  */
 public class HostUtils {
     private static final Logger logger = LoggerFactory

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/KafkaEmbedded.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/KafkaEmbedded.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/KafkaEmbedded.java
index db4d735..7ec5872 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/KafkaEmbedded.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/KafkaEmbedded.java
@@ -16,14 +16,12 @@
  */
 package org.apache.eagle.alert.utils;
 
-import java.io.File;
-import java.util.Properties;
-
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.InstanceSpec;
+import java.io.File;
+import java.util.Properties;
 
 public class KafkaEmbedded {
 
@@ -54,8 +52,8 @@ public class KafkaEmbedded {
         }
     }
 
-    public KafkaEmbedded(String kafka_url, String zk_url) {
-        this(extractKafkaPort(kafka_url), extractKafkaPort(zk_url));
+    public KafkaEmbedded(String kafkaUrl, String zkUrl) {
+        this(extractKafkaPort(kafkaUrl), extractKafkaPort(zkUrl));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java
index 88661d1..7ef54f4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java
@@ -1,6 +1,4 @@
-package org.apache.eagle.alert.utils;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,21 +14,20 @@ package org.apache.eagle.alert.utils;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.utils;
+
 public class StreamIdConversion {
-    public final static String STREAM_ID_TEMPLATE = "stream_%s_to_%s";
-    public final static String STREAM_ID_NUM_TEMPLATE = "stream_%s";
-    public static String generateStreamIdBetween(String sourceId, String targetId){
-        return String.format(STREAM_ID_TEMPLATE,sourceId,targetId);
+    public static final String STREAM_ID_TEMPLATE = "stream_%s_to_%s";
+    public static final String STREAM_ID_NUM_TEMPLATE = "stream_%s";
+
+    public static String generateStreamIdBetween(String sourceId, String targetId) {
+        return String.format(STREAM_ID_TEMPLATE, sourceId, targetId);
     }
 
     /**
-     * Hard-coded stream format in stream_${partitionNum}
-     *
-     *
-     * @param partitionNum
-     * @return
+     * Hard-coded stream format in stream_${partitionNum}.
      */
-    public static String generateStreamIdByPartition(int partitionNum){
-        return String.format(STREAM_ID_NUM_TEMPLATE,partitionNum);
+    public static String generateStreamIdByPartition(int partitionNum) {
+        return String.format(STREAM_ID_NUM_TEMPLATE, partitionNum);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/TimePeriodUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/TimePeriodUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/TimePeriodUtils.java
index 782188d..10f2cb2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/TimePeriodUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/TimePeriodUtils.java
@@ -23,40 +23,27 @@ import scala.Int;
 
 public class TimePeriodUtils {
     /**
-     * For example: timestamp stands for time: 1990/01/07 12:45 and period is PT30, then result is 1990/01/07 12:30
-     *
-     * @param seconds
-     * @param period
+     * For example: timestamp stands for time: 1990/01/07 12:45 and period is PT30, then result is 1990/01/07 12:30.
      *
      * @return formatted timestamp
      */
-    public static long formatSecondsByPeriod(long seconds,Seconds period){
+    public static long formatSecondsByPeriod(long seconds, Seconds period) {
         return seconds - (seconds % Int.int2long(period.getSeconds()));
     }
 
-    /**
-     * @param seconds
-     * @param period
-     * @return
-     */
-    public static long formatSecondsByPeriod(long seconds,Period period){
+    public static long formatSecondsByPeriod(long seconds, Period period) {
         return seconds - (seconds % Int.int2long(period.toStandardSeconds().getSeconds()));
     }
 
-    /**
-     * @param milliseconds
-     * @param period
-     * @return milliseconds
-     */
-    public static long formatMillisecondsByPeriod(long milliseconds,Period period){
-        return formatSecondsByPeriod(milliseconds/1000,period)*1000;
+    public static long formatMillisecondsByPeriod(long milliseconds, Period period) {
+        return formatSecondsByPeriod(milliseconds / 1000, period) * 1000;
     }
 
-    public static int getSecondsOfPeriod(Period period){
+    public static int getSecondsOfPeriod(Period period) {
         return period.toStandardSeconds().getSeconds();
     }
 
-    public static int getMillisecondsOfPeriod(Period period){
+    public static int getMillisecondsOfPeriod(Period period) {
         return getSecondsOfPeriod(period) * 1000;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ZookeeperEmbedded.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ZookeeperEmbedded.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ZookeeperEmbedded.java
index c7c358f..da2de0d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ZookeeperEmbedded.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/ZookeeperEmbedded.java
@@ -16,15 +16,14 @@
  */
 package org.apache.eagle.alert.utils;
 
-import java.io.File;
-import java.io.IOException;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.test.TestingServer;
+import java.io.File;
+import java.io.IOException;
 
 public class ZookeeperEmbedded {
     private TestingServer server;
@@ -52,7 +51,7 @@ public class ZookeeperEmbedded {
 
     public void shutdown() {
         try {
-            if(zookeeper!=null) {
+            if (zookeeper != null) {
                 if (!zookeeper.getState().equals(CuratorFrameworkState.STOPPED)) {
                     zookeeper.close();
                 }
@@ -61,7 +60,7 @@ public class ZookeeperEmbedded {
             e.printStackTrace();
         } finally {
             try {
-                if(server!=null) {
+                if (server != null) {
                     server.close();
                 }
             } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/log4j.properties
index fb13ad5..ba06033 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/log4j.properties
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/log4j.properties
@@ -12,9 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 log4j.rootLogger=DEBUG, stdout
-
 # standard output
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
index 8123f45..a68af84 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
@@ -1,12 +1,4 @@
-package org.apache.eagle.alert.config;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with this
  * work for additional information regarding copyright ownership. The ASF
@@ -22,6 +14,14 @@ import org.junit.Test;
  * License for the specific language governing permissions and limitations under
  * the License.
  */
+package org.apache.eagle.alert.config;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
 public class TestConfigBus {
 
     @Ignore

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java
index d1fb5f6..48bce2e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java
@@ -1,19 +1,4 @@
-package org.apache.eagle.alert.metric;
-
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.JvmAttributeGaugeSet;
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.metric.sink.KafkaSink;
-import org.apache.eagle.alert.metric.source.JVMMetricSource;
-import org.apache.eagle.alert.metric.source.MetricSource;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -29,18 +14,35 @@ import java.util.Map;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.metric;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.JvmAttributeGaugeSet;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.metric.sink.KafkaSink;
+import org.apache.eagle.alert.metric.source.JVMMetricSource;
+import org.apache.eagle.alert.metric.source.MetricSource;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
 public class MetricSystemTest {
-    @Test @Ignore
-    public void testKafkaSink(){
+    @Test
+    @Ignore
+    public void testKafkaSink() {
         KafkaSink sink = new KafkaSink();
         MetricRegistry registry = new MetricRegistry();
         registry.registerAll(new JvmAttributeGaugeSet());
-        sink.prepare(ConfigFactory.load().getConfig("metric.sink.kafka"),registry);
+        sink.prepare(ConfigFactory.load().getConfig("metric.sink.kafka"), registry);
         sink.report();
         sink.stop();
     }
 
-    @Test @Ignore
+    @Test
+    @Ignore
     public void testMetricSystem() throws InterruptedException {
         MetricSystem system = MetricSystem.load(ConfigFactory.load());
         system.register(new JVMMetricSource());
@@ -50,8 +52,9 @@ public class MetricSystemTest {
         system.stop();
     }
 
-    @Test @Ignore
-    public void testMetaConflict(){
+    @Test
+    @Ignore
+    public void testMetaConflict() {
         MetricSystem system = MetricSystem.load(ConfigFactory.load());
         system.register(new MetaConflictMetricSource());
         system.start();
@@ -62,7 +65,7 @@ public class MetricSystemTest {
     private class MetaConflictMetricSource implements MetricSource {
         private MetricRegistry registry = new MetricRegistry();
 
-        public MetaConflictMetricSource(){
+        public MetaConflictMetricSource() {
             registry.register("meta.conflict", (Gauge<String>) () -> "meta conflict happening!");
         }
 
@@ -81,15 +84,17 @@ public class MetricSystemTest {
     private class SampleMetricSource implements MetricSource {
         private MetricRegistry registry = new MetricRegistry();
 
-        public SampleMetricSource(){
+        public SampleMetricSource() {
             registry.register("sample.long", (Gauge<Long>) System::currentTimeMillis);
-            registry.register("sample.map", (Gauge<Map<String, Object>>) () -> new HashMap<String, Object>(){
+            registry.register("sample.map", (Gauge<Map<String, Object>>) () -> new HashMap<String, Object>() {
                 private static final long serialVersionUID = 3948508906655117683L;
+
                 {
-                    put("int",1234);
-                    put("str","text");
-                    put("bool",true);
-                }});
+                    put("int", 1234);
+                    put("str", "text");
+                    put("bool", true);
+                }
+            });
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java
index 707515e..50b00d9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java
@@ -19,18 +19,18 @@
 
 package org.apache.eagle.alert.service;
 
-import java.util.List;
-
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.List;
+
 public class TestMetadataServiceClientImpl {
     @SuppressWarnings("resource")
     @Ignore
     @Test
-    public void test() throws Exception{
+    public void test() throws Exception {
         MetadataServiceClientImpl impl = new MetadataServiceClientImpl("localhost", 58080, "/api/metadata/policies");
         List<PolicyDefinition> policies = impl.listPolicies();
         ObjectMapper mapper = new ObjectMapper();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf
index 738c844..b54a5ac 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf
@@ -14,19 +14,19 @@
 # limitations under the License.
 
 {
-	metric {
-		sink {
-			kafka {
-				"topic":"alert_metric_test"
-				"bootstrap.servers": "localhost:9092"
-			}
-			logger {
-				level = "INFO"
-			}
-			elasticsearch {
+  metric {
+    sink {
+      kafka {
+        "topic": "alert_metric_test"
+        "bootstrap.servers": "localhost:9092"
+      }
+      logger {
+        level = "INFO"
+      }
+      elasticsearch {
         hosts = ["10.64.223.222:9200"]
         index = "alert_metric_test"
       }
-		}
-	}
+    }
+  }
 }
\ No newline at end of file