You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/12/16 07:01:44 UTC

[2/5] incubator-eagle git commit: EAGLE-66 Typesafe Streaming DSL and KeyValue based Grouping

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NameConstants.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NameConstants.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NameConstants.scala
new file mode 100644
index 0000000..d4836e4
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NameConstants.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.utils
+
+object NameConstants {
+  val FIELD_PREFIX = "f"
+  val FIELD_KEY = "key"
+  val FIELD_VALUE = "value"
+  val FIELD_SEPARATOR = "_"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala
new file mode 100644
index 0000000..331cf7c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.utils
+
+import org.apache.eagle.datastream.core.StreamInfo
+
+case class NodeNameSelector(producer : StreamInfo) {
+  def getName : String = {
+    producer.name match {
+      case null => producer.toString+NameConstants.FIELD_SEPARATOR+producer.id
+      case _ => producer.name
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala
new file mode 100644
index 0000000..1d48752
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.datastream.utils
+
+import scala.reflect.api
+import scala.reflect.runtime.{universe => ru}
+
+/**
+ * @since  12/7/15
+ */
+object Reflections{
+  private val UNIT_CLASS = classOf[Unit]
+  private val UNIT_TYPE_TAG = ru.typeTag[Unit]
+
+  /**
+   * Class to TypeTag
+   * @param clazz class
+   * @tparam T Type T
+   * @return
+   */
+  def typeTag[T](clazz:Class[T]):ru.TypeTag[T]={
+    if(clazz == null){
+      null
+    }else if(clazz == UNIT_CLASS) {
+      UNIT_TYPE_TAG.asInstanceOf[ru.TypeTag[T]]
+    } else {
+      val mirror = ru.runtimeMirror(clazz.getClassLoader)
+      val sym = mirror.staticClass(clazz.getCanonicalName)
+      val tpe = sym.selfType
+      ru.TypeTag(mirror, new api.TypeCreator {
+        def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) =
+          if (m eq mirror) tpe.asInstanceOf[U#Type]
+          else throw new IllegalArgumentException(s"Type tag defined in $mirror cannot be migrated to other mirrors.")
+      })
+    }
+  }
+
+  def javaTypeClass[T](obj: AnyRef, index: Int = 0):Class[T] = JavaReflections.getGenericTypeClass(obj,index).asInstanceOf[Class[T]]
+  def javaTypeTag[T](obj: AnyRef, index: Int = 0):ru.TypeTag[T] = typeTag(JavaReflections.getGenericTypeClass(obj,index)).asInstanceOf[ru.TypeTag[T]]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala
new file mode 100644
index 0000000..4ac0cdc
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala
@@ -0,0 +1,43 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream.utils
+
+import java.util
+
+import org.apache.eagle.datastream.core.StreamProducer
+
+import scala.collection.JavaConverters._
+
+object UnionUtils {
+  def join[T1,T2](producers : StreamProducer[T1]*) : StreamProducer[T2] = {
+    producers.head.streamUnion(producers.drop(1))
+  }
+
+  def join[T1,T2](producers : java.util.List[StreamProducer[T1]]) : StreamProducer[T2] = {
+    val newList = new util.ArrayList(producers)
+    val head = newList.get(0)
+    newList.remove(0)
+    head.streamUnion(newList.asScala);
+  }
+
+  def join[T1,T2](producers : List[StreamProducer[T1]]) : StreamProducer[T2] = {
+    val head = producers.head
+    head.streamUnion(producers.tail);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java
new file mode 100644
index 0000000..c5f9045
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
+import org.junit.Test;
+
+/**
+ * @since 12/5/15
+ */
+public class TestExecutionEnvironmentJava {
+
+    @Test
+    public void testGetEnvInJava() {
+        StormExecutionEnvironment env0 = ExecutionEnvironments.get(StormExecutionEnvironment.class);
+        Assert.assertNotNull(env0);
+
+        StormExecutionEnvironment env1 = ExecutionEnvironments.get(new String[]{}, StormExecutionEnvironment.class);
+        Assert.assertNotNull(env1);
+        Config config = ConfigFactory.load();
+        StormExecutionEnvironment env2 = ExecutionEnvironments.get(config, StormExecutionEnvironment.class);
+        Assert.assertNotNull(env2);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
index 07c19de..979b09d 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
@@ -33,22 +33,22 @@ public class TestJavaMain {
     }
 
     //@Test
-    public void testGeneral(){
-        Config config = ConfigFactory.load();
-        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
-        env.newSource(new TestKeyValueSpout()).renameOutputFields(2).groupBy(Arrays.asList(0)).flatMap(new GroupedEchoExecutor()).withParallelism(2);
-        env.execute();
-    }
+//    public void testGeneral(){
+//        Config config = ConfigFactory.load();
+//        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
+//        env.fromSpout(new TestKeyValueSpout()).withOutputFields(2).groupBy(Arrays.asList(0)).flatMap(new GroupedEchoExecutor()).parallelism(2);
+//        env.execute();
+//    }
 
-    //@Test
-    public void testMap(){
-        Config config = ConfigFactory.load();
-        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
-        SerializableFunction1 f1 = new SerializableFunction1<Object, Object>();
-        env.newSource(new TestKeyValueSpout()).renameOutputFields(2).
-                map1(f1);
-        env.execute();
-    }
+//    //@Test
+//    public void testMap(){
+//        Config config = ConfigFactory.load();
+//        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
+//        SerializableFunction1 f1 = new SerializableFunction1<Object, Object>();
+//        env.fromSpout(new TestKeyValueSpout()).withOutputFields(2).
+//                map1(f1);
+//        env.execute();
+//    }
 
     @Test
     public void test() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java
new file mode 100644
index 0000000..6d2d36f
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream;
+
+import junit.framework.Assert;
+import org.apache.eagle.datastream.utils.Reflections;
+import org.junit.Test;
+import scala.reflect.api.TypeTags;
+
+/**
+ * @since 12/8/15
+ */
+public class TestJavaReflectionUtils {
+    @Test
+    public void testJavaFlatMapper(){
+        Class<String> clazz = Reflections.javaTypeClass(new JavaEchoExecutor(), 0);
+        Assert.assertEquals(String.class,clazz);
+        TypeTags.TypeTag typeTag = Reflections.typeTag(clazz);
+        Assert.assertNotNull(typeTag);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
index 5651e74..2a5043c 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
@@ -16,8 +16,7 @@
  */
 package org.apache.eagle.datastream;
 
-import org.apache.eagle.datastream.kafka.KafkaStreamMonitorApp;
-import org.apache.eagle.datastream.kafka.KafkaStreamMonitorApp;
+import org.apache.eagle.datastream.storm.KafkaStreamMonitorApp;
 
 /**
  * @since 11/7/15

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
index 5463079..4339e5a 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
@@ -19,46 +19,44 @@
 
 package org.apache.eagle.datastream
 
-import java.util
-
-import com.typesafe.config.{ConfigFactory, Config}
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment
 
 object testStreamUnionExpansion extends App{
-  val config : Config = ConfigFactory.load;
+  val config : Config = ConfigFactory.load
   val env = new StormExecutionEnvironment(config)
-  val tail1 = env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
-  val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a))
+  val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
+  val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a))
   tail1.streamUnion(List(tail2)).map1(a => "xyz")
-//  env.execute
 }
 
 object testStreamGroupbyExpansion extends App{
   val config : Config = ConfigFactory.load;
   val env = new StormExecutionEnvironment(config)
-  env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).groupBy(1).map2(a => ("key1",a))
+  env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).groupBy(1).map2(a => ("key1",a))
   //env.execute
 }
 
 object testStreamUnionAndGroupbyExpansion extends App{
   val config : Config = ConfigFactory.load;
   val env = new StormExecutionEnvironment(config)
-  val tail1 = env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(1)
-  val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(0)
+  val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(1)
+  val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(0)
   tail1.streamUnion(List(tail2)).map1(a => "xyz")
-  //env.execute
+//  env.execute()
 }
 
 /**
  * 1. stream schema
  * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
  * 2. policy
- * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\'')==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
+ * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
  */
 object testAlertExpansion extends App{
   val config : Config = ConfigFactory.load;
   val env = new StormExecutionEnvironment(config)
-  val tail1 = env.newSource(TestSpout()).withName("testSpout1")
-                  .flatMap(WordPrependForAlertExecutor("test")).withName("prepend")
+  val tail1 = env.fromSpout(TestSpout()).nameAs("testSpout1")
+                  .flatMap(WordPrependForAlertExecutor("test")).nameAs("prepend")
                   .alertWithConsumer("s1", "alert1")
   //env.execute
 }
@@ -68,14 +66,14 @@ object testAlertExpansion extends App{
  * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
  * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s2","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
  * 2. policy
- * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\'')==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
+ * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\)==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
  */
 object testAlertExpansionWithUnion extends App{
   val config : Config = ConfigFactory.load;
   val env = new StormExecutionEnvironment(config)
-  val tail1 = env.newSource(TestSpout()).withName("testSpout1").flatMap(WordPrependForAlertExecutor("test")).withName("prepend") //.map2(a => ("key1",a))
-  val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")) //.map2(a => ("key1",a))
-  tail1.streamUnion(List(tail2)).alert(util.Arrays.asList("s1","s2"), "alert1", true)
+  val tail1 = env.fromSpout(TestSpout()).nameAs("testSpout1").flatMap(WordPrependForAlertExecutor("test")).nameAs("prepend") //.map2(a => ("key1",a))
+  val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")) //.map2(a => ("key1",a))
+  tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = true)
   //env.execute
 }
 
@@ -83,7 +81,7 @@ object testAlertExpansionWithUnion extends App{
 object testStreamUnionExpansionWithSharedSpout extends App{
   val config : Config = ConfigFactory.load;
   val env = new StormExecutionEnvironment(config)
-  val source = env.newSource(TestSpout())
+  val source = env.fromSpout(TestSpout())
   val tail1 = source.flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
   val tail2 = source.flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a))
   tail1.streamUnion(List(tail2)).map1(a  => {
@@ -96,11 +94,11 @@ object testStreamUnionExpansionWithSharedSpout extends App{
 object testStreamUnionExpansionWithSharedSpout_2 extends App{
   val config : Config = ConfigFactory.load;
   val env = new StormExecutionEnvironment(config)
-  val source = env.newSource(TestSpout())
+  val source = env.fromSpout(TestSpout())
   val tail1 = source.flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
   source.streamUnion(List(tail1)).map1(a  => {
     println(a)
     "xyz"
   })
 //  env.execute
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
new file mode 100644
index 0000000..b43f42e
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import com.typesafe.config.ConfigFactory
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment
+
+/**
+ * @since  12/5/15
+ */
+object TestExecutionEnvironment extends App{
+  val env0 = ExecutionEnvironments.get[StormExecutionEnvironment]
+  println(env0)
+  val config = ConfigFactory.load()
+  val env1 = ExecutionEnvironments.get[StormExecutionEnvironment](config)
+  println(env1)
+  val env2 = ExecutionEnvironments.get[StormExecutionEnvironment](Array[String]("-D","key=value"))
+  println(env2)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
index c550182..27fdd88 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
@@ -16,9 +16,7 @@
  */
 package org.apache.eagle.datastream
 
-import java.util
-
-import com.typesafe.config.{ConfigFactory, Config}
+import com.typesafe.config.{Config, ConfigFactory}
 
 /**
  * explicit union
@@ -27,14 +25,14 @@ import com.typesafe.config.{ConfigFactory, Config}
  * 1. stream schema
  * curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
  * 2. policy
- * curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\'')==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
+ * curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\)==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
  */
 object UnionForAlert extends App{
   val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  val tail1 = env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
-  val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key2",a))
-  tail1.streamUnion(List(tail2)).alert(util.Arrays.asList("s1","s2"), "alert1", false)
+  val env = ExecutionEnvironments.getStorm(config)
+  val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
+  val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key2",a))
+  tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = false)
   //env.execute
 }
 
@@ -43,10 +41,10 @@ object UnionForAlert extends App{
  */
 object TestAlertAfterFlatMap extends App{
   val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  val tail1 = env.newSource(TestSpout())
+  val env = ExecutionEnvironments.getStorm(config)
+  val tail1 = env.fromSpout(TestSpout())
                   .flatMap(WordPrependForAlertExecutor("test"))
-                  .alert(util.Arrays.asList("s1"), "alert1", false)
+                  .alert(Seq("s1"), "alert1", consume = false)
   //env.execute
 }
 
@@ -55,44 +53,44 @@ object TestAlertAfterFlatMap extends App{
  */
 object TestAlertAfterMap extends App{
   val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  val tail1 = env.newSource(TestSpout())
+  val env = ExecutionEnvironments.getStorm(config)
+  val tail1 = env.fromSpout(TestSpout())
     .flatMap(WordPrependForAlertExecutor2("test"))
     .map2(a => ("key", a))
-    .alert(util.Arrays.asList("s1"), "alert1", false)
+    .alert(Seq("s1"), "alert1", false)
   //env.execute
 }
 
 object StormRunnerWithoutSplitOrJoin extends Application{
   val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  env.newSource(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test"))
+  val env = ExecutionEnvironments.getStorm(config)
+  env.fromSpout(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test"))
     .flatMap(PatternAlertExecutor("test.*"))
-  //env.execute
+//  env.execute()
 }
 
 object StormRunnerWithSplit extends Application{
   val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  val toBeSplit = env.newSource(TestSpout()).flatMap(EchoExecutor())
+  val env = ExecutionEnvironments.getStorm(config)
+  val toBeSplit = env.fromSpout(TestSpout()).flatMap(EchoExecutor())
   toBeSplit.flatMap(WordPrependExecutor("test")).flatMap(PatternAlertExecutor("test.*"))
   toBeSplit.flatMap(WordAppendExecutor("test"))
-  //env.execute
+//  env.execute()
 }
 
 object StormRunnerWithUnion extends Application{
   val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  val tail1 = env.newSource(TestSpout()).flatMap(WordPrependExecutor("test"))
-  val tail2 = env.newSource(TestSpout()).flatMap(WordAppendExecutor("test"))
+  val env = ExecutionEnvironments.getStorm(config)
+  val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependExecutor("test"))
+  val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendExecutor("test"))
   tail1.streamUnion(List(tail2)).flatMap(PatternAlertExecutor(".*test.*"))
-  //env.execute
+  env.execute()
 }
 
 object StormRunnerWithFilter extends Application{
   val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  env.newSource(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test")).
+  val env = ExecutionEnvironments.getStorm(config)
+  env.fromSpout(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test")).
     filter(_=>false).
     flatMap(PatternAlertExecutor("test.*"))
   //env.execute
@@ -100,8 +98,8 @@ object StormRunnerWithFilter extends Application{
 
 object StormRunnerWithJavaExecutor extends Application{
   val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  env.newSource(TestSpout()).flatMap(new JavaEchoExecutor()).flatMap(WordPrependExecutor("test")).
+  val env = ExecutionEnvironments.getStorm(config)
+  env.fromSpout(TestSpout()).flatMap(new JavaEchoExecutor()).flatMap(WordPrependExecutor("test")).
     filter(_=>false).
     flatMap(PatternAlertExecutor("test.*"))
   //env.execute
@@ -109,14 +107,14 @@ object StormRunnerWithJavaExecutor extends Application{
 
 object StormRunnerWithKeyValueSpout extends Application{
   val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  env.newSource(TestKeyValueSpout()).groupBy(1).flatMap(new GroupedEchoExecutor()).withParallelism(2)
+  val env = ExecutionEnvironments.getStorm(config)
+  env.fromSpout(TestKeyValueSpout()).groupBy(1).flatMap(new GroupedEchoExecutor()).parallelism(2)
   //env.execute
 }
 
 object StormRunnerWithKeyValueSpoutRenameOutputFields extends Application{
   val config : Config = ConfigFactory.load;
-  val env = ExecutionEnvironmentFactory.getStorm(config)
-  env.newSource(TestKeyValueSpout()).renameOutputFields(2).groupBy(0).flatMap(new GroupedEchoExecutor()).withParallelism(2)
+  val env = ExecutionEnvironments.getStorm(config)
+  env.fromSpout(TestKeyValueSpout()).withOutputFields(2).groupBy(0).flatMap(new GroupedEchoExecutor()).parallelism(2)
   //env.execute
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
new file mode 100644
index 0000000..cf95304
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.datastream
+
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment
+
+/**
+ * @since  12/4/15
+ */
+case class Entity(name:String,value:Double,var inc:Int=0)
+
+object TestIterableWithGroupBy extends App {
+
+  val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
+
+  val tuples = Seq(
+    Entity("a", 1),
+    Entity("a", 2),
+    Entity("a", 3),
+    Entity("b", 2),
+    Entity("c", 3),
+    Entity("d", 3)
+  )
+
+  env.from(tuples)
+    .groupByKey(_.name)
+    .map(o => {o.inc += 2;o})
+    .filter(_.name != "b")
+    .filter(_.name != "c")
+    .groupByKey(o=>(o.name,o.value))
+    .map(o => (o.name,o))
+    .map(o => (o._1,o._2.value,o._2.inc))
+    .foreach(println)
+
+  env.execute()
+}
+
+object TestIterableWithGroupByCircularly extends App{
+  val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
+  val tuples = Seq(
+    Entity("a", 1),
+    Entity("a", 2),
+    Entity("a", 3),
+    Entity("b", 2),
+    Entity("c", 3),
+    Entity("d", 3)
+  )
+  env.from(tuples,recycle = true)
+    .map(o => {o.inc += 2;o})
+    .groupByKey(_.name)
+    .foreach(println)
+  env.execute()
+}
+
+object TestGroupByKeyOnSpoutproxy extends App{
+  val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
+
+  val tuples = Seq(
+    Entity("a", 1),
+    Entity("a", 2),
+    Entity("a", 3),
+    Entity("b", 2),
+    Entity("c", 3),
+    Entity("d", 3)
+  )
+
+  env.fromSpout[String](TestSpout())
+    .groupByKey(_.charAt(0))
+    .foreach(println)
+  env.execute()
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java
new file mode 100644
index 0000000..7e66478
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java
@@ -0,0 +1,26 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream;
+
+import java.util.List;
+
+public interface JavaMapper {
+    List<Object> map(List<Object> input);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java
new file mode 100644
index 0000000..39ce6c2
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java
@@ -0,0 +1,24 @@
+package org.apache.eagle.datastream;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @since 12/8/15
+ */
+public interface JavaTypeCompatible {
+    Class<?> getType();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
index 5c89a41..63aa4fb 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
@@ -21,26 +21,18 @@ trait EagleTuple extends Serializable{
 }
 
 case class Tuple1[T0](f0 : T0) extends EagleTuple{
-  override def getList : List[AnyRef] = {
-    return List(f0.asInstanceOf[AnyRef])
-  }
+  override def getList : List[AnyRef] = List(f0.asInstanceOf[AnyRef])
+
 }
 
 case class Tuple2[T0, T1](f0 : T0, f1: T1) extends EagleTuple{
-  override def getList : List[AnyRef] = {
-    return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef])
-  }
+  override def getList : List[AnyRef] = List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef])
 }
 
 case class Tuple3[T0, T1, T2](f0 : T0, f1: T1, f2: T2) extends EagleTuple{
-  override def getList : List[AnyRef] = {
-    return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef])
-  }
+  override def getList : List[AnyRef] = List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef])
 }
 
 case class Tuple4[T0, T1, T2, T3](f0 : T0, f1: T1, f2: T2, f3 : T3) extends EagleTuple{
-  override def getList : List[AnyRef] = {
-    return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef], f3.asInstanceOf[AnyRef])
-  }
-}
-
+  override def getList : List[AnyRef] = List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef], f3.asInstanceOf[AnyRef])
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
index 22e063d..8569ae5 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
@@ -16,6 +16,6 @@
  */
 package org.apache.eagle.datastream
 
-trait FlatMapper[T, R] extends Serializable{
-  def flatMap(input : T, collector : Collector[R])
+trait FlatMapper[T] extends Serializable {
+  def flatMap(input : Seq[AnyRef], collector : Collector[T])
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
index b3d77e9..22ae213 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
@@ -17,18 +17,23 @@
 package org.apache.eagle.datastream
 
 import com.typesafe.config.Config
+import scala.collection.JavaConverters._
 
-trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef], R] {
+trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[R] {
   def prepareConfig(config : Config)
   def init
   def fields : Array[String]
 }
 
-trait JavaStormStreamExecutor[R <: EagleTuple] extends FlatMapper[java.util.List[AnyRef], R] {
+trait JavaStormStreamExecutor[R <: EagleTuple] extends FlatMapper[R] {
   def prepareConfig(config : Config)
   def init
   def fields : Array[String]
   override def toString() = this.getClass.getSimpleName
+
+  override def flatMap(input : Seq[AnyRef], collector : Collector[R]) = flatMap(input.asJava,collector)
+
+  def flatMap(input : java.util.List[AnyRef], collector : Collector[R])
 }
 
 abstract class StormStreamExecutor1[T0] extends StormStreamExecutor[Tuple1[T0]] {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
index d9363e8..2b2b1ac 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
@@ -20,10 +20,12 @@ package org.apache.eagle.service.client;
 import com.typesafe.config.Config;
 import org.apache.eagle.common.config.EagleConfigConstants;
 
+import java.io.Serializable;
+
 /**
  * Some common codes to enable DAO through eagle service including service host/post, credential population etc.
  */
-public class EagleServiceConnector {
+public class EagleServiceConnector implements Serializable{
     private final String eagleServiceHost;
     private final Integer eagleServicePort;
     private String username;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
index 218b812..4eeda05 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
@@ -19,12 +19,10 @@ package org.apache.eagle.metric.kafka;
 import backtype.storm.spout.SchemeAsMultiScheme;
 import backtype.storm.topology.base.BaseRichSpout;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
-import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
-import org.apache.eagle.datastream.StormExecutionEnvironment;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.kafka.BrokerHosts;
@@ -42,13 +40,8 @@ public class EagleMetricCollectorMain {
     private static final Logger LOG = LoggerFactory.getLogger(EagleMetricCollectorMain.class);
 
     public static void main(String[] args) throws Exception {
-        new ConfigOptionParser().load(args);
-        //System.setProperty("config.resource", "/application.local.conf");
-
-        Config config = ConfigFactory.load();
-
-        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
-
+        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
+        Config config = env.getConfig();
         String deserClsName = config.getString("dataSourceConfig.deserializerClass");
         final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
             @Override
@@ -60,6 +53,8 @@ public class EagleMetricCollectorMain {
             }
         };
 
+
+        // TODO: Refactored the anonymous in to independen class file, avoiding too complex logic in main method
         KafkaSourcedSpoutProvider kafkaMessageSpoutProvider = new KafkaSourcedSpoutProvider() {
             @Override
             public BaseRichSpout getSpout(Config context) {
@@ -119,9 +114,9 @@ public class EagleMetricCollectorMain {
             }
         };
 
-        env.newSource(new KafkaOffsetSourceSpoutProvider().getSpout(config)).renameOutputFields(0).withName("kafkaLogLagChecker");
-        env.newSource(kafkaMessageSpoutProvider.getSpout(config)).renameOutputFields(2).withName("kafkaMessageFetcher").groupBy(Arrays.asList(0))
+        env.fromSpout(new KafkaOffsetSourceSpoutProvider()).withOutputFields(0).nameAs("kafkaLogLagChecker");
+        env.fromSpout(kafkaMessageSpoutProvider).withOutputFields(2).nameAs("kafkaMessageFetcher").groupBy(Arrays.asList(0))
                 .flatMap(new KafkaMessageDistributionExecutor());
         env.execute();
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
index 5fd02fd..8f25564 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
@@ -19,14 +19,11 @@ package org.apache.eagle.metric.kafka;
 import backtype.storm.topology.base.BaseRichSpout;
 import com.typesafe.config.Config;
 import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
 import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
 import org.apache.eagle.service.client.ServiceConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class KafkaOffsetSourceSpoutProvider {
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetSourceSpoutProvider.class);
-	
+public class KafkaOffsetSourceSpoutProvider implements StormSpoutProvider {
 	public BaseRichSpout getSpout(Config config){
 
 		ZKStateConfig zkStateConfig = new ZKStateConfig();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileSensitivityAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileSensitivityAPIEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileSensitivityAPIEntity.java
index e4e63c9..00b6ca7 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileSensitivityAPIEntity.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileSensitivityAPIEntity.java
@@ -49,4 +49,4 @@ public class FileSensitivityAPIEntity extends TaggedLogAPIEntity{
 		this.sensitivityType = sensitivityType;
 		valueChanged("sensitivityType");
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
index e808502..7d32091 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
@@ -20,7 +20,6 @@
 package org.apache.eagle.security.partition;
 
 import com.sun.jersey.api.client.WebResource;
-import org.apache.commons.lang.time.DateUtils;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.metric.MetricConstants;
 import org.apache.eagle.partition.DataDistributionDao;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogProcessorMain.java b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogProcessorMain.java
index 1a90cbd..b00f39c 100644
--- a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogProcessorMain.java
+++ b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogProcessorMain.java
@@ -17,32 +17,17 @@
  */
 package org.apache.eagle.security.hbase;
 
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigRenderOptions;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
-import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
-import org.apache.eagle.datastream.StormExecutionEnvironment;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
 import org.apache.eagle.security.hbase.sensitivity.HbaseResourceSensitivityDataJoinExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class HbaseAuditLogProcessorMain {
-    private static final Logger LOG = LoggerFactory.getLogger(HbaseAuditLogProcessorMain.class);
-
     public static void main(String[] args) throws Exception{
-        Config config = new ConfigOptionParser().load(args);
-
-        LOG.info("Config class: " + config.getClass().getCanonicalName());
-
-        if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
-
-        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
-        env.newSource(new KafkaSourcedSpoutProvider().getSpout(config)).renameOutputFields(1).withName("kafkaMsgConsumer")
+        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
+        env.fromSpout(new KafkaSourcedSpoutProvider()).withOutputFields(1).nameAs("kafkaMsgConsumer")
                 .flatMap(new HbaseResourceSensitivityDataJoinExecutor())
                 .alertWithConsumer("hbaseSecurityLogEventStream", "hbaseSecurityLogAlertExecutor");
         env.execute();
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
index 2e8b852..5bb2aff 100644
--- a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
+++ b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
@@ -55,7 +55,6 @@ public class HbaseResourceSensitivityDataJoinExecutor extends JavaStormStreamExe
         }
     }
 
-
     @Override
     public void flatMap(List<Object> input, Collector<Tuple2<String, Map>> outputCollector){
         @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/run_auditlog_topology.sh
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/run_auditlog_topology.sh b/eagle-security/eagle-security-hdfs-auditlog/run_auditlog_topology.sh
index 8e8d969..38d8ce9 100755
--- a/eagle-security/eagle-security-hdfs-auditlog/run_auditlog_topology.sh
+++ b/eagle-security/eagle-security-hdfs-auditlog/run_auditlog_topology.sh
@@ -1,2 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 export JAVA_HOME=$(/usr/libexec/java_home -v 1.7)
 mvn -X exec:java -Dexec.mainClass="eagle.security.auditlog.HdfsAuditLogProcessorMain"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/run_hostname_lookkup.sh
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/run_hostname_lookkup.sh b/eagle-security/eagle-security-hdfs-auditlog/run_hostname_lookkup.sh
index ba761de..fffd8c0 100755
--- a/eagle-security/eagle-security-hdfs-auditlog/run_hostname_lookkup.sh
+++ b/eagle-security/eagle-security-hdfs-auditlog/run_hostname_lookkup.sh
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 ### refer to http://mojo.codehaus.org/exec-maven-plugin/usage.html
 ### java goal is to execute program within maven JVM
 export JAVA_HOME=$(/usr/libexec/java_home -v 1.7)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/run_message_producer.sh
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/run_message_producer.sh b/eagle-security/eagle-security-hdfs-auditlog/run_message_producer.sh
index 9ecaa26..dbe50f3 100755
--- a/eagle-security/eagle-security-hdfs-auditlog/run_message_producer.sh
+++ b/eagle-security/eagle-security-hdfs-auditlog/run_message_producer.sh
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 export JAVA_HOME=$(/usr/libexec/java_home -v 1.7)
 ### mvn -X exec:java  -Dexec.args="-input /Users/user1/Downloads/hdfs-audit.log.2015-01-22-22 -maxNum 10000000 -topic hdfs_audit_log" -Pproducer
 mvn exec:java -Dexec.mainClass="eagle.security.auditlog.util.AuditLogKafkaProducer" -Dexec.args="-input /Users/user1/Downloads/hdfs-audit.log.2015-01-22-22 -maxNum 20000 -topic hdfs_audit_log"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/run_message_producer_in_assembly.sh
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/run_message_producer_in_assembly.sh b/eagle-security/eagle-security-hdfs-auditlog/run_message_producer_in_assembly.sh
index 20d7cae..128926d 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/run_message_producer_in_assembly.sh
+++ b/eagle-security/eagle-security-hdfs-auditlog/run_message_producer_in_assembly.sh
@@ -1 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 java -cp target/eagle-app-security-activity-monitoring-dataproc-1.0.0-jar-with-dependencies.jar:/Users/user1/.m2/repository/org/slf4j/slf4j-api/1.4.3/slf4j-api-1.4.3.jar:/Users/user1/.m2/repository/org/slf4j/slf4j-log4j12/1.4.3/slf4j-log4j12-1.4.3.jar  eagle.app.security.dataproc.util.AuditLogKafkaProducer -input /Users/user1/projects/eagle-app-security-activity-monitoring/eagle-app-security-activity-monitoring-dataproc/src/main/resources/auditlog/auditlog.1  --topic hdfs_audit_log

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/src/assembly/eagle-dam-auditlog-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/assembly/eagle-dam-auditlog-assembly.xml b/eagle-security/eagle-security-hdfs-auditlog/src/assembly/eagle-dam-auditlog-assembly.xml
index f3b17d4..0acf619 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/assembly/eagle-dam-auditlog-assembly.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/assembly/eagle-dam-auditlog-assembly.xml
@@ -1,3 +1,20 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
 <assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
index eb84fd3..f63a9be 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
@@ -20,32 +20,27 @@ package org.apache.eagle.security.auditlog;
 
 import backtype.storm.spout.SchemeAsMultiScheme;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
-import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
-import org.apache.eagle.datastream.StormExecutionEnvironment;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.core.StreamProducer;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
 import org.apache.eagle.partition.DataDistributionDao;
 import org.apache.eagle.partition.PartitionAlgorithm;
 import org.apache.eagle.partition.PartitionStrategy;
 import org.apache.eagle.partition.PartitionStrategyImpl;
 import org.apache.eagle.security.partition.DataDistributionDaoImpl;
 import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
-import org.apache.eagle.datastream.StreamProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
 public class HdfsAuditLogProcessorMain {
-	private static final Logger LOG = LoggerFactory.getLogger(HdfsAuditLogProcessorMain.class);
-
     public static PartitionStrategy createStrategy(Config config) {
+        // TODO: Refactor configuration structure to avoid repeated config processing configure ~ hao
         String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
         Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
         String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
@@ -72,6 +67,7 @@ public class HdfsAuditLogProcessorMain {
                          return Arrays.asList(map.get("user"), tmp);
                  }
          };
+
          KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
                  @Override
                  public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
@@ -81,8 +77,9 @@ public class HdfsAuditLogProcessorMain {
          return provider;
     }
 
-    public static void execWithDefaultPartition(Config config, StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
-        StreamProducer source = env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").groupBy(Arrays.asList(0));
+    @SuppressWarnings("unchecked")
+    public static void execWithDefaultPartition(StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
+        StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer").groupBy(Arrays.asList(0));
         StreamProducer reassembler = source.flatMap(new HdfsUserCommandReassembler()).groupBy(Arrays.asList(0));
         source.streamUnion(reassembler)
               .flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
@@ -91,9 +88,10 @@ public class HdfsAuditLogProcessorMain {
         env.execute();
     }
 
-    public static void execWithBalancedPartition(Config config, StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
-        PartitionStrategy strategy = createStrategy(config);
-        StreamProducer source = env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").customGroupBy(strategy);
+    @SuppressWarnings("unchecked")
+    public static void execWithBalancedPartition(StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
+        PartitionStrategy strategy = createStrategy(env.getConfig());
+        StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer").groupBy(strategy);
         StreamProducer reassembler = source.flatMap(new HdfsUserCommandReassembler()).groupBy(Arrays.asList(0));
         source.streamUnion(reassembler)
                 .flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
@@ -103,18 +101,14 @@ public class HdfsAuditLogProcessorMain {
     }
 
 	public static void main(String[] args) throws Exception{
-        Config config = new ConfigOptionParser().load(args);
-        LOG.info("Config class: " + config.getClass().getCanonicalName());
-        if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
-
-        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
-        KafkaSourcedSpoutProvider provider = createProvider(config);
-        Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") ? config.getBoolean("eagleProps.balancePartitionEnabled") : false;
+        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
+        Config config = env.getConfig();
+        KafkaSourcedSpoutProvider provider = createProvider(env.getConfig());
+        Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") && config.getBoolean("eagleProps.balancePartitionEnabled");
         if (balancePartition) {
-            execWithBalancedPartition(config, env, provider);
-        }
-        else {
-            execWithDefaultPartition(config, env, provider);
+            execWithBalancedPartition(env, provider);
+        } else {
+            execWithDefaultPartition(env, provider);
         }
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
index c6894dc..c2d50db 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
@@ -36,9 +36,6 @@ import org.wso2.siddhi.core.stream.input.InputHandler;
 
 import java.util.*;
 
-/**
- * Created by yonzhang on 11/20/15.
- */
 public class HdfsUserCommandReassembler extends JavaStormStreamExecutor2<String, Map> {
     private static final Logger LOG = LoggerFactory.getLogger(HdfsUserCommandReassembler.class);
     private Config config;
@@ -142,14 +139,14 @@ public class HdfsUserCommandReassembler extends JavaStormStreamExecutor2<String,
 
     @Override
     public void flatMap(List<Object> input, Collector<Tuple2<String, Map>> collector) {
-        LOG.debug("incoming event:" + input.get(1));
-        SortedMap<String, Object> toBeCopied = (SortedMap<String, Object>)input.get(1);
-        SortedMap<String, Object> event = new TreeMap<String, Object>(toBeCopied);
+        if(LOG.isDebugEnabled()) LOG.debug("incoming event:" + input.get(1));
+        SortedMap<String, Object> toBeCopied = (SortedMap<String, Object>) input.get(1);
+        SortedMap<String, Object> event = new TreeMap<>(toBeCopied);
         Object[] siddhiEvent = convertToSiddhiEvent(collector, event);
         try {
             inputHandler.send(siddhiEvent);
-        }catch(Exception ex){
-            LOG.error("fail sending event to Siddhi pattern engine", ex);
+        } catch (Exception ex){
+            LOG.error("Fail sending event to Siddhi pattern engine", ex);
             throw new IllegalStateException(ex);
         }
     }
@@ -163,4 +160,4 @@ public class HdfsUserCommandReassembler extends JavaStormStreamExecutor2<String,
         }
         return siddhiEvent;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
index 1d13082..5a82fda 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
@@ -55,7 +55,7 @@
     #"kafkaStatisticRangeInMin" : 60,
     "eagleService": {
       "host": "localhost",
-      "port": 9099,
+      "port": 38080,
       "username": "admin",
       "password": "secret"
     },

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/auditlog/auditlog.1
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/auditlog/auditlog.1 b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/auditlog/auditlog.1
deleted file mode 100644
index 7d470f4..0000000
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/auditlog/auditlog.1
+++ /dev/null
@@ -1,16 +0,0 @@
-2015-09-21 21:36:52,172 INFO FSNamesystem.audit: allowed=true   ugi=ambari-qa (auth:SIMPLE)     ip=/10.0.2.15   cmd=getfileinfo src=/tmp/hive   dst=null        perm=null       proto=rpc
-2015-09-21 21:36:52,268 INFO FSNamesystem.audit: allowed=true   ugi=ambari-qa (auth:SIMPLE)     ip=/10.0.2.15   cmd=getfileinfo src=/tmp/hive   dst=null        perm=null       proto=rpc
-2015-09-21 21:36:52,274 INFO FSNamesystem.audit: allowed=true   ugi=ambari-qa (auth:SIMPLE)     ip=/10.0.2.15   cmd=getfileinfo src=/tmp/hive/ambari-qa dst=null        perm=null       proto=rpc
-2015-09-21 21:36:52,430 INFO FSNamesystem.audit: allowed=true   ugi=ambari-qa (auth:SIMPLE)     ip=/10.0.2.15   cmd=getfileinfo src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929    dst=null        perm=null       proto=rpc
-2015-09-21 21:36:52,437 INFO FSNamesystem.audit: allowed=true   ugi=ambari-qa (auth:SIMPLE)     ip=/10.0.2.15   cmd=mkdirs      src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929    dst=null        perm=ambari-qa:hdfs:rwx------   proto=rpc
-2015-09-21 21:36:52,443 INFO FSNamesystem.audit: allowed=true   ugi=ambari-qa (auth:SIMPLE)     ip=/10.0.2.15   cmd=getfileinfo src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929    dst=null        perm=null       proto=rpc
-2015-09-21 21:36:52,448 INFO FSNamesystem.audit: allowed=true   ugi=ambari-qa (auth:SIMPLE)     ip=/10.0.2.15   cmd=getfileinfo src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929/_tmp_space.db      dst=null        perm=null       proto=rpc
-2015-09-21 21:36:52,451 INFO FSNamesystem.audit: allowed=true   ugi=ambari-qa (auth:SIMPLE)     ip=/10.0.2.15   cmd=mkdirs      src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929/_tmp_space.db      dst=null        perm=ambari-qa:hdfs:rwx------   proto=rpc
-2015-09-21 21:36:52,453 INFO FSNamesystem.audit: allowed=true   ugi=ambari-qa (auth:SIMPLE)     ip=/10.0.2.15   cmd=getfileinfo src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929/_tmp_space.db      dst=null        perm=null       proto=rpc
-2015-09-21 21:36:55,455 INFO FSNamesystem.audit: allowed=true   ugi=ambari-qa (auth:SIMPLE)     ip=/10.0.2.15   cmd=delete      src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929    dst=null        perm=null       proto=rpc
-2015-09-21 21:36:55,477 INFO FSNamesystem.audit: allowed=true   ugi=ambari-qa (auth:SIMPLE)     ip=/10.0.2.15   cmd=getfileinfo src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929    dst=null        perm=null       proto=rpc
-2015-09-21 21:36:55,480 INFO FSNamesystem.audit: allowed=true   ugi=ambari-qa (auth:SIMPLE)     ip=/10.0.2.15   cmd=getfileinfo src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929/_tmp_space.db      dst=null        perm=null       proto=rpc
-2015-09-21 21:37:20,809 INFO FSNamesystem.audit: allowed=true   ugi=oozie (auth:SIMPLE) ip=/10.0.2.15   cmd=listStatus  src=/user/oozie/share/lib       dst=null        perm=null       proto=rpc
-2015-09-21 21:37:24,330 INFO FSNamesystem.audit: allowed=false  ugi=hive (auth:SIMPLE)  ip=/10.0.2.15   cmd=getfileinfo src=/ranger/audit/hiveServer2/20150921/sandbox.hortonworks.com-audit.log        dst=null        perm=null       proto=rpc
-2015-09-21 21:37:24,334 INFO FSNamesystem.audit: allowed=false  ugi=hive (auth:SIMPLE)  ip=/10.0.2.15   cmd=getfileinfo src=/ranger/audit/hiveServer2/20150921/sandbox.hortonworks.com-audit-1.log      dst=null        perm=null       proto=rpc
-2015-09-21 21:37:24,338 INFO FSNamesystem.audit: allowed=false  ugi=hive (auth:SIMPLE)  ip=/10.0.2.15   cmd=getfileinfo src=/ranger/audit/hiveServer2/20150921  dst=null        perm=null       proto=rpc
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml
index 16368cd..a68a323 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 topology.workers: 1
 topology.acker.executors: 1
 topology.tasks: 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-securitylog/src/main/java/org/apache/eagle/security/securitylog/HDFSSecurityLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-securitylog/src/main/java/org/apache/eagle/security/securitylog/HDFSSecurityLogProcessorMain.java b/eagle-security/eagle-security-hdfs-securitylog/src/main/java/org/apache/eagle/security/securitylog/HDFSSecurityLogProcessorMain.java
index 9238cad..07bdd3b 100644
--- a/eagle-security/eagle-security-hdfs-securitylog/src/main/java/org/apache/eagle/security/securitylog/HDFSSecurityLogProcessorMain.java
+++ b/eagle-security/eagle-security-hdfs-securitylog/src/main/java/org/apache/eagle/security/securitylog/HDFSSecurityLogProcessorMain.java
@@ -17,32 +17,15 @@
  */
 
 package org.apache.eagle.security.securitylog;
-
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigRenderOptions;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
-import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
-import org.apache.eagle.datastream.StormExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
 
 public class HDFSSecurityLogProcessorMain {
-    private static final Logger LOG = LoggerFactory.getLogger(HDFSSecurityLogProcessorMain.class);
-
     public static void main(String[] args) throws Exception{
-        Config config = new ConfigOptionParser().load(args);
-
-        LOG.info("Config class: " + config.getClass().getCanonicalName());
-
-        if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
-
-        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
-
-        env.newSource(new KafkaSourcedSpoutProvider().getSpout(config)).renameOutputFields(1).withName("kafkaMsgConsumer")
+        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
+        env.fromSpout(new KafkaSourcedSpoutProvider()).withOutputFields(1).nameAs("kafkaMsgConsumer")
                 .alertWithConsumer("hdfsSecurityLogEventStream", "hdfsSecurityLogAlertExecutor");
         env.execute();
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSFileSystem.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSFileSystem.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSFileSystem.java
index 9a17075..67e4092 100644
--- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSFileSystem.java
+++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSFileSystem.java
@@ -1,12 +1,13 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,