You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:41 UTC

[34/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala
new file mode 100644
index 0000000..48dc7e5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{FlatSpec, Matchers}
+
+class TestStreamDAGBuilder extends FlatSpec with Matchers{
+//  "a single source DAG with groupBy" should "be traversed without groupBy node" in {
+//    val config = ConfigFactory.load()
+//    val env = ExecutionEnvironmentFactory.getStorm(config)
+//    val tail = env.newSource(null).flatMap(EchoExecutor()).groupBy(0).flatMap(WordPrependExecutor("test"))
+//    val dag = new StreamDAGBuilder(env.heads).build()
+//    val iter = dag.iterator()
+//    assert(iter.hasNext)
+//    iter.next() match{
+//      case StormSourceProducer(t) => assert(t == null)
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match{
+//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match{
+//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
+//      case _ => assert(false)
+//    }
+//  }
+//
+//  "a single source DAG with groupBy from spout" should "be traversed without groupBy node" in {
+//    val config = ConfigFactory.load()
+//    val env = ExecutionEnvironmentFactory.getStorm(config)
+//    val tail = env.newSource(null).groupBy(0).flatMap(WordPrependExecutor("test"))
+//    val dag = new StreamDAGBuilder(env.heads).build()
+//    val iter = dag.iterator()
+//    assert(iter.hasNext)
+//    iter.next() match{
+//      case StormSourceProducer(t) => assert(t == null)
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match{
+//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
+//      case _ => assert(false)
+//    }
+//  }
+//
+//  "a single source DAG with groupBy from spout and then split" should "be traversed without groupBy node" in {
+//    val config = ConfigFactory.load()
+//    val env = ExecutionEnvironmentFactory.getStorm(config)
+//    val groupby = env.newSource(null).groupBy(0)
+//    groupby.flatMap(WordPrependExecutor("test"))
+//    groupby.flatMap(WordAppendExecutor("test"))
+//    val dag = new StreamDAGBuilder(env.heads).build()
+//    val iter = dag.iterator()
+//    assert(iter.hasNext)
+//    iter.next() match{
+//      case StormSourceProducer(t) => assert(t == null)
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match{
+//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match{
+//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
+//      case _ => assert(false)
+//    }
+//  }
+//
+//  "a single source DAG without stream join" should "be traversed sequentially like specified" in{
+//    val config = ConfigFactory.load()
+//    val env = ExecutionEnvironmentFactory.getStorm(config)
+//    val tail = env.newSource(null).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test"))
+//    val dag = new StreamDAGBuilder(env.heads).build()
+//    val iter = dag.iterator()
+//    assert(iter.hasNext)
+//    iter.next() match{
+//      case StormSourceProducer(t) => assert(t == null)
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match{
+//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match{
+//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
+//      case _ => assert(false)
+//    }
+//  }
+//
+//  "a single source with split" should "has more than one tail producer" in {
+//    val config = ConfigFactory.load()
+//    val env = ExecutionEnvironmentFactory.getStorm(config)
+//    val echo = env.newSource(null).flatMap(EchoExecutor())
+//    val tail1 = echo.flatMap(WordPrependExecutor("test"))
+//    val tail2 = echo.flatMap(WordAppendExecutor("test"))
+//    val dag = new StreamDAGBuilder(env.heads).build()
+//    val iter = dag.iterator()
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case StormSourceProducer(t) => assert(t == null)
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
+//      case _ => assert(false)
+//    }
+//  }
+//
+//  "a single source with split and join" should "has join" in {
+//    val config = ConfigFactory.load()
+//    val env = ExecutionEnvironmentFactory.getStorm(config)
+//    val echo = env.newSource(null).flatMap(EchoExecutor())
+//    val tail1 = echo.flatMap(WordPrependExecutor("test"))
+//    val tail2 = echo.flatMap(WordAppendExecutor("test")).filter(_=>true).streamUnion(List(tail1)).
+//      flatMap(PatternAlertExecutor("test*"))
+//    val dag = new StreamDAGBuilder(env.heads).build()
+//    val iter = dag.iterator()
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case StormSourceProducer(t) => assert(t == null)
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case FilterProducer(fn) =>
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case FlatMapProducer(worker) => assert(worker.asInstanceOf[PatternAlertExecutor].pattern.equals("test*"))
+//      case _ => assert(false)
+//    }
+//    assert(!iter.hasNext)
+//  }
+//
+//  "multiple sources with split and union" should "has union" in {
+//    val config = ConfigFactory.load()
+//    val env = ExecutionEnvironmentFactory.getStorm(config)
+//    val source1 = env.newSource(TestSpout())
+//    val source2 = env.newSource(TestSpout())
+//    val source3 = env.newSource(TestSpout())
+//
+//    val tail1 = source1.flatMap(WordPrependExecutor("test"))
+//    val tail2 = source2.filter(_=>true)
+//    val tail3 = source3.flatMap(WordAppendExecutor("test")).streamUnion(List(tail1, tail2)).
+//      flatMap(PatternAlertExecutor("abc*"))
+//
+//    val dag = new StreamDAGBuilder(env.heads).build()
+//    val iter = dag.iterator()
+//
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case StormSourceProducer(t) => assert(t.isInstanceOf[TestSpout])
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case StormSourceProducer(t) => assert(t.isInstanceOf[String])
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case FilterProducer(fn) =>
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case SourceProducer(t) => assert(t.isInstanceOf[String])
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
+//      case _ => assert(false)
+//    }
+//    assert(iter.hasNext)
+//    iter.next() match {
+//      case FlatMapProducer(worker) => assert(worker.asInstanceOf[PatternAlertExecutor].pattern.equals("abc*"))
+//      case _ => assert(false)
+//    }
+//    assert(!iter.hasNext)
+//  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/EagleOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/EagleOutputCollector.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/EagleOutputCollector.java
deleted file mode 100644
index ef4e0cb..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/EagleOutputCollector.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.core;
-
-import java.io.Serializable;
-
-/**
- * expose simple interface for streaming executor to populate output data
- *
- */
-public interface EagleOutputCollector extends Serializable{
-	void collect(ValuesArray t);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/JsonSerDeserUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/JsonSerDeserUtils.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/JsonSerDeserUtils.java
deleted file mode 100644
index 69dff5d..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/JsonSerDeserUtils.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.core;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-public class JsonSerDeserUtils {
-	private static final Logger LOG = LoggerFactory.getLogger(JsonSerDeserUtils.class);
-
-	public static <T> String serialize(T o) throws Exception{
-		return serialize(o, null);
-	}
-	
-	public static <T> String serialize(T o, List<Module> modules) throws Exception {
-		ObjectMapper mapper = new ObjectMapper();
-		if (modules != null) { 
-			mapper.registerModules(modules);
-		}
-		return mapper.writeValueAsString(o);
-	}
-
-	public static <T> T deserialize(String value, Class<T> cls) throws Exception{
-		return deserialize(value, cls, null);
-	}
-	
-	public static <T> T deserialize(String value, Class<T> cls, List<Module> modules) throws Exception{
-		ObjectMapper mapper = new ObjectMapper();
-		if (modules != null) { 
-			mapper.registerModules(modules);
-		}
-		return mapper.readValue(value, cls);	
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/StreamingProcessConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/StreamingProcessConstants.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/StreamingProcessConstants.java
deleted file mode 100644
index ab45fbb..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/StreamingProcessConstants.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.core;
-
-public class StreamingProcessConstants {
-	public static final String EVENT_PARTITION_KEY = "eventPartitionKey";
-	public static final String EVENT_STREAM_NAME = "streamName";
-	public static final String EVENT_ATTRIBUTE_MAP = "value";
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/ValuesArray.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/ValuesArray.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/ValuesArray.java
deleted file mode 100644
index b5c26bb..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/ValuesArray.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.core;
-
-import java.util.ArrayList;
-
-/**
- * multiple datapoints are stored within one ValuesArray object
- * sent out
- */
-public class ValuesArray extends ArrayList<Object>{
-	private static final long serialVersionUID = -8218427810421668178L;
-
-	public ValuesArray() {
-        
-    }
-    
-    public ValuesArray(Object... vals) {
-        super(vals.length);
-        for(Object o: vals) {
-            add(o);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/AbstractConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/AbstractConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/AbstractConfigOptionParser.java
deleted file mode 100644
index a660430..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/AbstractConfigOptionParser.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.util;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.commons.cli.*;
-
-import java.util.Map;
-
-/**
- * @since 8/22/15
- */
-public abstract class AbstractConfigOptionParser {
-
-    private final Options options;
-    private final Parser parser;
-
-    public AbstractConfigOptionParser(){
-        parser = parser();
-        options = options();
-    }
-
-    /**
-     * @return Parser
-     */
-    protected abstract Parser parser();
-
-    /**
-     * @return Options
-     */
-    protected abstract Options options();
-
-    public abstract Map<String,String> parseConfig(String[] arguments) throws ParseException;
-
-    /**
-     * Load config as system properties
-     *
-     * @param arguments command line arguments
-     * @throws ParseException
-     */
-    public Config load(String[] arguments) throws ParseException {
-        Map<String,String> configProps = parseConfig(arguments);
-        for(Map.Entry<String,String> entry:configProps.entrySet()){
-            System.setProperty(entry.getKey(),entry.getValue());
-        }
-        System.setProperty("config.trace", "loads");
-        return ConfigFactory.load();
-    }
-
-    public CommandLine parse(String[] arguments) throws ParseException {
-        return this.parser.parse(this.options,arguments);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/ConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/ConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/ConfigOptionParser.java
deleted file mode 100644
index a217bfb..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/ConfigOptionParser.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.util;
-
-import org.apache.commons.cli.*;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-public class ConfigOptionParser extends AbstractConfigOptionParser {
-    private final static String CONFIG_OPT_FLAG = "D";
-
-    @Override
-    protected Parser parser() {
-        return new BasicParser();
-    }
-
-    @Override
-    protected Options options() {
-        Options options = new Options();
-        options.addOption(CONFIG_OPT_FLAG, true, "Config properties in format of \"-D key=value\"");
-        return options;
-    }
-
-    @Override
-    public Map<String,String> parseConfig(String[] arguments) throws ParseException {
-        CommandLine cmd = parse(arguments);
-
-        Map<String,String> result = new HashMap<>();
-        if(cmd.hasOption(CONFIG_OPT_FLAG)){
-            String[] values = cmd.getOptionValues(CONFIG_OPT_FLAG);
-            for(String value:values){
-                int eqIndex = value.indexOf("=");
-                if(eqIndex>0 && eqIndex<value.length()){
-                    String k = value.substring(0,eqIndex);
-                    String v = value.substring(eqIndex+1,value.length());
-                    if(result.containsKey(k)){
-                        throw new ParseException("Duplicated "+CONFIG_OPT_FLAG+" "+value);
-                    }else{
-                        result.put(k,v);
-                    }
-                }else{
-                    throw new ParseException("Invalid format: -"+CONFIG_OPT_FLAG+" "+value+", required: -"+CONFIG_OPT_FLAG+" key=value");
-                }
-            }
-        }
-        return result;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java
new file mode 100644
index 0000000..0c4b1ab
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.core;
+
+import java.io.Serializable;
+
+/**
+ * expose simple interface for streaming executor to populate output data
+ *
+ */
+public interface EagleOutputCollector extends Serializable{
+	void collect(ValuesArray t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java
new file mode 100644
index 0000000..6933a5f
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.core;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class JsonSerDeserUtils {
+	private static final Logger LOG = LoggerFactory.getLogger(JsonSerDeserUtils.class);
+
+	public static <T> String serialize(T o) throws Exception{
+		return serialize(o, null);
+	}
+	
+	public static <T> String serialize(T o, List<Module> modules) throws Exception {
+		ObjectMapper mapper = new ObjectMapper();
+		if (modules != null) { 
+			mapper.registerModules(modules);
+		}
+		return mapper.writeValueAsString(o);
+	}
+
+	public static <T> T deserialize(String value, Class<T> cls) throws Exception{
+		return deserialize(value, cls, null);
+	}
+	
+	public static <T> T deserialize(String value, Class<T> cls, List<Module> modules) throws Exception{
+		ObjectMapper mapper = new ObjectMapper();
+		if (modules != null) { 
+			mapper.registerModules(modules);
+		}
+		return mapper.readValue(value, cls);	
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java
new file mode 100644
index 0000000..fc2a016
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.core;
+
+public class StreamingProcessConstants {
+	public static final String EVENT_PARTITION_KEY = "eventPartitionKey";
+	public static final String EVENT_STREAM_NAME = "streamName";
+	public static final String EVENT_ATTRIBUTE_MAP = "value";
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java
new file mode 100644
index 0000000..9971cb2
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.core;
+
+import java.util.ArrayList;
+
+/**
+ * multiple datapoints are stored within one ValuesArray object
+ * sent out
+ */
+public class ValuesArray extends ArrayList<Object>{
+	private static final long serialVersionUID = -8218427810421668178L;
+
+	public ValuesArray() {
+        
+    }
+    
+    public ValuesArray(Object... vals) {
+        super(vals.length);
+        for(Object o: vals) {
+            add(o);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java
new file mode 100644
index 0000000..19853ef
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.util;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.commons.cli.*;
+
+import java.util.Map;
+
+/**
+ * @since 8/22/15
+ */
+public abstract class AbstractConfigOptionParser {
+
+    private final Options options;
+    private final Parser parser;
+
+    public AbstractConfigOptionParser(){
+        parser = parser();
+        options = options();
+    }
+
+    /**
+     * @return Parser
+     */
+    protected abstract Parser parser();
+
+    /**
+     * @return Options
+     */
+    protected abstract Options options();
+
+    public abstract Map<String,String> parseConfig(String[] arguments) throws ParseException;
+
+    /**
+     * Load config as system properties
+     *
+     * @param arguments command line arguments
+     * @throws ParseException
+     */
+    public Config load(String[] arguments) throws ParseException {
+        Map<String,String> configProps = parseConfig(arguments);
+        for(Map.Entry<String,String> entry:configProps.entrySet()){
+            System.setProperty(entry.getKey(),entry.getValue());
+        }
+        System.setProperty("config.trace", "loads");
+        return ConfigFactory.load();
+    }
+
+    public CommandLine parse(String[] arguments) throws ParseException {
+        return this.parser.parse(this.options,arguments);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java
new file mode 100644
index 0000000..0bd7559
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.util;
+
+import org.apache.commons.cli.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class ConfigOptionParser extends AbstractConfigOptionParser {
+    private final static String CONFIG_OPT_FLAG = "D";
+
+    @Override
+    protected Parser parser() {
+        return new BasicParser();
+    }
+
+    @Override
+    protected Options options() {
+        Options options = new Options();
+        options.addOption(CONFIG_OPT_FLAG, true, "Config properties in format of \"-D key=value\"");
+        return options;
+    }
+
+    @Override
+    public Map<String,String> parseConfig(String[] arguments) throws ParseException {
+        CommandLine cmd = parse(arguments);
+
+        Map<String,String> result = new HashMap<>();
+        if(cmd.hasOption(CONFIG_OPT_FLAG)){
+            String[] values = cmd.getOptionValues(CONFIG_OPT_FLAG);
+            for(String value:values){
+                int eqIndex = value.indexOf("=");
+                if(eqIndex>0 && eqIndex<value.length()){
+                    String k = value.substring(0,eqIndex);
+                    String v = value.substring(eqIndex+1,value.length());
+                    if(result.containsKey(k)){
+                        throw new ParseException("Duplicated "+CONFIG_OPT_FLAG+" "+value);
+                    }else{
+                        result.put(k,v);
+                    }
+                }else{
+                    throw new ParseException("Invalid format: -"+CONFIG_OPT_FLAG+" "+value+", required: -"+CONFIG_OPT_FLAG+" key=value");
+                }
+            }
+        }
+        return result;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/Collector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/Collector.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/Collector.scala
deleted file mode 100644
index bbb5f3f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/Collector.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-trait Collector[R] {
-  def collect(r : R);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/EagleTuple.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/EagleTuple.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/EagleTuple.scala
deleted file mode 100644
index 0225329..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/EagleTuple.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-trait EagleTuple extends Serializable{
-  def getList : List[AnyRef]
-}
-
-case class Tuple1[T0](f0 : T0) extends EagleTuple{
-  override def getList : List[AnyRef] = {
-    return List(f0.asInstanceOf[AnyRef])
-  }
-}
-
-case class Tuple2[T0, T1](f0 : T0, f1: T1) extends EagleTuple{
-  override def getList : List[AnyRef] = {
-    return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef])
-  }
-}
-
-case class Tuple3[T0, T1, T2](f0 : T0, f1: T1, f2: T2) extends EagleTuple{
-  override def getList : List[AnyRef] = {
-    return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef])
-  }
-}
-
-case class Tuple4[T0, T1, T2, T3](f0 : T0, f1: T1, f2: T2, f3 : T3) extends EagleTuple{
-  override def getList : List[AnyRef] = {
-    return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef], f3.asInstanceOf[AnyRef])
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/FlatMapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/FlatMapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/FlatMapper.scala
deleted file mode 100644
index 488ba9f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/FlatMapper.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-trait FlatMapper[T, R] extends Serializable{
-  def flatMap(input : T, collector : Collector[R])
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/StormStreamExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/StormStreamExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/StormStreamExecutor.scala
deleted file mode 100644
index 73b7a3e..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/StormStreamExecutor.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-import com.typesafe.config.Config
-
-trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef], R] {
-  def prepareConfig(config : Config)
-  def init
-  def fields : Array[String]
-}
-
-trait JavaStormStreamExecutor[R <: EagleTuple] extends FlatMapper[java.util.List[AnyRef], R] {
-  def prepareConfig(config : Config)
-  def init
-  def fields : Array[String]
-  override def toString() = this.getClass.getSimpleName
-}
-
-abstract class StormStreamExecutor1[T0] extends StormStreamExecutor[Tuple1[T0]] {
-  override def fields = Array("f0")
-}
-
-abstract class JavaStormStreamExecutor1[T0] extends JavaStormStreamExecutor[Tuple1[T0]] {
-  override def fields = Array("f0")
-}
-
-abstract class  StormStreamExecutor2[T0, T1] extends StormStreamExecutor[Tuple2[T0, T1]] {
-  override def fields = Array("f0", "f1")
-}
-
-abstract class  JavaStormStreamExecutor2[T0, T1] extends JavaStormStreamExecutor[Tuple2[T0, T1]] {
-  override def fields = Array("f0", "f1")
-}
-
-abstract class  StormStreamExecutor3[T0, T1, T2] extends StormStreamExecutor[Tuple3[T0, T1, T2]] {
-  override def fields = Array("f0", "f1", "f2")
-}
-
-abstract class  JavaStormStreamExecutor3[T0, T1, T2] extends JavaStormStreamExecutor[Tuple3[T0, T1, T2]] {
-  override def fields = Array("f0", "f1", "f2")
-}
-
-abstract class  StormStreamExecutor4[T0, T1, T2, T3] extends StormStreamExecutor[Tuple4[T0, T1, T2, T3]] {
-  override def fields = Array("f0", "f1", "f2", "f3")
-}
-
-abstract class  JavaStormStreamExecutor4[T0, T1, T2, T3] extends JavaStormStreamExecutor[Tuple4[T0, T1, T2, T3]] {
-  override def fields = Array("f0", "f1", "f2", "f3")
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/Collector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/Collector.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/Collector.scala
new file mode 100644
index 0000000..2532063
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/Collector.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+trait Collector[R] {
+  def collect(r : R);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
new file mode 100644
index 0000000..5c89a41
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+trait EagleTuple extends Serializable{
+  def getList : List[AnyRef]
+}
+
+case class Tuple1[T0](f0 : T0) extends EagleTuple{
+  override def getList : List[AnyRef] = {
+    return List(f0.asInstanceOf[AnyRef])
+  }
+}
+
+case class Tuple2[T0, T1](f0 : T0, f1: T1) extends EagleTuple{
+  override def getList : List[AnyRef] = {
+    return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef])
+  }
+}
+
+case class Tuple3[T0, T1, T2](f0 : T0, f1: T1, f2: T2) extends EagleTuple{
+  override def getList : List[AnyRef] = {
+    return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef])
+  }
+}
+
+case class Tuple4[T0, T1, T2, T3](f0 : T0, f1: T1, f2: T2, f3 : T3) extends EagleTuple{
+  override def getList : List[AnyRef] = {
+    return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef], f3.asInstanceOf[AnyRef])
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
new file mode 100644
index 0000000..22e063d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+trait FlatMapper[T, R] extends Serializable{
+  def flatMap(input : T, collector : Collector[R])
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
new file mode 100644
index 0000000..b3d77e9
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import com.typesafe.config.Config
+
+trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef], R] {
+  def prepareConfig(config : Config)
+  def init
+  def fields : Array[String]
+}
+
+trait JavaStormStreamExecutor[R <: EagleTuple] extends FlatMapper[java.util.List[AnyRef], R] {
+  def prepareConfig(config : Config)
+  def init
+  def fields : Array[String]
+  override def toString() = this.getClass.getSimpleName
+}
+
+abstract class StormStreamExecutor1[T0] extends StormStreamExecutor[Tuple1[T0]] {
+  override def fields = Array("f0")
+}
+
+abstract class JavaStormStreamExecutor1[T0] extends JavaStormStreamExecutor[Tuple1[T0]] {
+  override def fields = Array("f0")
+}
+
+abstract class  StormStreamExecutor2[T0, T1] extends StormStreamExecutor[Tuple2[T0, T1]] {
+  override def fields = Array("f0", "f1")
+}
+
+abstract class  JavaStormStreamExecutor2[T0, T1] extends JavaStormStreamExecutor[Tuple2[T0, T1]] {
+  override def fields = Array("f0", "f1")
+}
+
+abstract class  StormStreamExecutor3[T0, T1, T2] extends StormStreamExecutor[Tuple3[T0, T1, T2]] {
+  override def fields = Array("f0", "f1", "f2")
+}
+
+abstract class  JavaStormStreamExecutor3[T0, T1, T2] extends JavaStormStreamExecutor[Tuple3[T0, T1, T2]] {
+  override def fields = Array("f0", "f1", "f2")
+}
+
+abstract class  StormStreamExecutor4[T0, T1, T2, T3] extends StormStreamExecutor[Tuple4[T0, T1, T2, T3]] {
+  override def fields = Array("f0", "f1", "f2", "f3")
+}
+
+abstract class  JavaStormStreamExecutor4[T0, T1, T2, T3] extends JavaStormStreamExecutor[Tuple4[T0, T1, T2, T3]] {
+  override def fields = Array("f0", "f1", "f2", "f3")
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/EmbeddedHbase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/EmbeddedHbase.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/EmbeddedHbase.java
deleted file mode 100644
index 980d410..0000000
--- a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/EmbeddedHbase.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.service.hbase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class EmbeddedHbase {
-    private HBaseTestingUtility util;
-    private MiniHBaseCluster hBaseCluster;
-    private static EmbeddedHbase hbase;
-    private int port;    
-    private String znode;
-    private static int DEFAULT_PORT = 2181;
-    private static String DEFAULT_ZNODE = "/hbase-unsecure";
-	private static final Logger LOG = LoggerFactory.getLogger(EmbeddedHbase.class);
-	
-    private EmbeddedHbase(int port, String znode) {
-    	this.port = port;
-    	this.znode = znode;    	
-    }
-    
-    private EmbeddedHbase(int port) {
-    	this(port, DEFAULT_ZNODE);
-    }
-    
-    public static EmbeddedHbase getInstance() {
-    	if (hbase == null) {
-    		synchronized(EmbeddedHbase.class) {
-    			if (hbase == null) {
-    				hbase = new EmbeddedHbase();
-    				hbase.start();   						
-    			}
-    		}
-    	}
-    	return hbase;
-    }
-    
-    private EmbeddedHbase() {
-    	this(DEFAULT_PORT, DEFAULT_ZNODE);
-    }
-
-    public void start() {
-    	try {
-	    	util = new HBaseTestingUtility();
-	        Configuration conf= util.getConfiguration();
-	        conf.setInt("test.hbase.zookeeper.property.clientPort", port);
-	        conf.set("zookeeper.znode.parent", znode);
-	        conf.setInt("hbase.zookeeper.property.maxClientCnxns", 200);
-	        conf.setInt("hbase.master.info.port", -1);//avoid port clobbering
-	        // start mini hbase cluster
-	        hBaseCluster = util.startMiniCluster();
-	        Configuration config = hBaseCluster.getConf();
-	        
-	        config.set("zookeeper.session.timeout", "120000");
-	        config.set("hbase.zookeeper.property.tickTime", "6000");
-	        config.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
-	        config.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "1");
-	        config.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
-	        
-	        Runtime.getRuntime().addShutdownHook(new Thread() {
-	            @Override
-	            public void run() {
-	            	shutdown();
-	            }
-	        }); 
-    	}
-    	catch (Throwable t) {
-    		LOG.error("Got an exception: ",t);
-    	}
-    }
-
-    public void shutdown() {    	
-    	try {
-            util.shutdownMiniCluster();
-        }
-    	catch (Throwable t) {
-    		LOG.info("Got an exception, " + t , t.getCause());
-    		try {
-                util.shutdownMiniCluster();
-    		}
-    		catch (Throwable t1) {
-    		}
-    	}
-    }
-    
-    public void createTable(String tableName, String cf) {
-    	try {    		
-    		util.createTable(tableName, cf);
-    	}
-    	catch (Exception ex) {
-    		LOG.warn("Create table failed, probably table already existed, table name: " + tableName);
-    	}
-    }
-    
-    public void deleteTable(String tableName){
-    	try {
-    		util.deleteTable(tableName);
-    	}
-    	catch (Exception ex) {
-    		LOG.warn("Delete table failed, probably table not existed, table name: " + tableName);
-    	}
-    }
-
-    public static void main(String[] args){
-        EmbeddedHbase hbase = new EmbeddedHbase();
-        hbase.start();
-        for(String table : new Tables().getTables()){
-            hbase.createTable(table, "f");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/Tables.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/Tables.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/Tables.java
deleted file mode 100644
index 43bdbc6..0000000
--- a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/Tables.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.service.hbase;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class Tables {
-    List<String> tables = new ArrayList<String>();
-    public Tables(){
-        tables.add("eagle_metric");
-
-        tables.add("actiondetail");
-        tables.add("alertdetail");
-        tables.add("alertgroup");
-        tables.add("alertmeta");
-        tables.add("alertMetaEntity");
-
-        // for alert framework
-        tables.add("alertDataSource");
-        tables.add("alertStream");
-        tables.add("alertExecutor");
-        tables.add("alertStreamSchema");
-        tables.add("alertdef");
-
-        // for security
-        tables.add("hiveResourceSensitivity");
-        tables.add("fileSensitivity");
-        tables.add("mlmodel");
-        tables.add("userprofile");
-    }
-
-    public List<String> getTables(){
-        return this.tables;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java
new file mode 100644
index 0000000..c204fac
--- /dev/null
+++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class EmbeddedHbase {
+    private HBaseTestingUtility util;
+    private MiniHBaseCluster hBaseCluster;
+    private static EmbeddedHbase hbase;
+    private int port;    
+    private String znode;
+    private static int DEFAULT_PORT = 2181;
+    private static String DEFAULT_ZNODE = "/hbase-unsecure";
+	private static final Logger LOG = LoggerFactory.getLogger(EmbeddedHbase.class);
+	
+    private EmbeddedHbase(int port, String znode) {
+    	this.port = port;
+    	this.znode = znode;    	
+    }
+    
+    private EmbeddedHbase(int port) {
+    	this(port, DEFAULT_ZNODE);
+    }
+    
+    public static EmbeddedHbase getInstance() {
+    	if (hbase == null) {
+    		synchronized(EmbeddedHbase.class) {
+    			if (hbase == null) {
+    				hbase = new EmbeddedHbase();
+    				hbase.start();   						
+    			}
+    		}
+    	}
+    	return hbase;
+    }
+    
+    private EmbeddedHbase() {
+    	this(DEFAULT_PORT, DEFAULT_ZNODE);
+    }
+
+    public void start() {
+    	try {
+	    	util = new HBaseTestingUtility();
+	        Configuration conf= util.getConfiguration();
+	        conf.setInt("test.hbase.zookeeper.property.clientPort", port);
+	        conf.set("zookeeper.znode.parent", znode);
+	        conf.setInt("hbase.zookeeper.property.maxClientCnxns", 200);
+	        conf.setInt("hbase.master.info.port", -1);//avoid port clobbering
+	        // start mini hbase cluster
+	        hBaseCluster = util.startMiniCluster();
+	        Configuration config = hBaseCluster.getConf();
+	        
+	        config.set("zookeeper.session.timeout", "120000");
+	        config.set("hbase.zookeeper.property.tickTime", "6000");
+	        config.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
+	        config.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "1");
+	        config.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
+	        
+	        Runtime.getRuntime().addShutdownHook(new Thread() {
+	            @Override
+	            public void run() {
+	            	shutdown();
+	            }
+	        }); 
+    	}
+    	catch (Throwable t) {
+    		LOG.error("Got an exception: ",t);
+    	}
+    }
+
+    public void shutdown() {    	
+    	try {
+            util.shutdownMiniCluster();
+        }
+    	catch (Throwable t) {
+    		LOG.info("Got an exception, " + t , t.getCause());
+    		try {
+                util.shutdownMiniCluster();
+    		}
+    		catch (Throwable t1) {
+    		}
+    	}
+    }
+    
+    public void createTable(String tableName, String cf) {
+    	try {    		
+    		util.createTable(tableName, cf);
+    	}
+    	catch (Exception ex) {
+    		LOG.warn("Create table failed, probably table already existed, table name: " + tableName);
+    	}
+    }
+    
+    public void deleteTable(String tableName){
+    	try {
+    		util.deleteTable(tableName);
+    	}
+    	catch (Exception ex) {
+    		LOG.warn("Delete table failed, probably table not existed, table name: " + tableName);
+    	}
+    }
+
+    public static void main(String[] args){
+        EmbeddedHbase hbase = new EmbeddedHbase();
+        hbase.start();
+        for(String table : new Tables().getTables()){
+            hbase.createTable(table, "f");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
new file mode 100644
index 0000000..a31e27e
--- /dev/null
+++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.hbase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Tables {
+    List<String> tables = new ArrayList<String>();
+    public Tables(){
+        tables.add("eagle_metric");
+
+        tables.add("actiondetail");
+        tables.add("alertdetail");
+        tables.add("alertgroup");
+        tables.add("alertmeta");
+        tables.add("alertMetaEntity");
+
+        // for alert framework
+        tables.add("alertDataSource");
+        tables.add("alertStream");
+        tables.add("alertExecutor");
+        tables.add("alertStreamSchema");
+        tables.add("alertdef");
+
+        // for security
+        tables.add("hiveResourceSensitivity");
+        tables.add("fileSensitivity");
+        tables.add("mlmodel");
+        tables.add("userprofile");
+    }
+
+    public List<String> getTables(){
+        return this.tables;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/eagle/service/hbase/TestHBaseBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/eagle/service/hbase/TestHBaseBase.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/eagle/service/hbase/TestHBaseBase.java
deleted file mode 100644
index fc15f70..0000000
--- a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/eagle/service/hbase/TestHBaseBase.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.service.hbase;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestHBaseBase {
-    protected static EmbeddedHbase hbase;
-
-    @BeforeClass
-    public static void setUpHBase() {
-        hbase = EmbeddedHbase.getInstance();
-    }
-
-    @Test
-    public void test() {
-
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java
new file mode 100644
index 0000000..87fffcd
--- /dev/null
+++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.hbase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHBaseBase {
+    protected static EmbeddedHbase hbase;
+
+    @BeforeClass
+    public static void setUpHBase() {
+        hbase = EmbeddedHbase.getInstance();
+    }
+
+    @Test
+    public void test() {
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-server/runEmbeddedServer.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-server/runEmbeddedServer.sh b/eagle-core/eagle-embed/eagle-embed-server/runEmbeddedServer.sh
index 39fd45a..c3192c2 100755
--- a/eagle-core/eagle-embed/eagle-embed-server/runEmbeddedServer.sh
+++ b/eagle-core/eagle-embed/eagle-embed-server/runEmbeddedServer.sh
@@ -1,4 +1,4 @@
 #!/bin/bash
 
 export JAVA_HOME=$(/usr/libexec/java_home -v 1.7)
-mvn -X exec:java -Dexec.mainClass="eagle.service.embedded.tomcat.EmbeddedServer" -Dexec.args="../../../eagle-security/eagle-security-webservice/target/eagle-service 38080"
\ No newline at end of file
+mvn -X exec:java -Dexec.mainClass="org.apache.eagle.service.embedded.tomcat.EmbeddedServer" -Dexec.args="../../../eagle-security/eagle-security-webservice/target/eagle-service 38080"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-server/src/main/java/eagle/service/embedded/tomcat/EmbeddedServer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-server/src/main/java/eagle/service/embedded/tomcat/EmbeddedServer.java b/eagle-core/eagle-embed/eagle-embed-server/src/main/java/eagle/service/embedded/tomcat/EmbeddedServer.java
deleted file mode 100644
index 02e7988..0000000
--- a/eagle-core/eagle-embed/eagle-embed-server/src/main/java/eagle/service/embedded/tomcat/EmbeddedServer.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.service.embedded.tomcat;
-
-import java.io.File;
-
-import org.apache.catalina.LifecycleState;
-import org.apache.catalina.startup.Tomcat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EmbeddedServer {
-	
-	private static EmbeddedServer server;
-	private Tomcat tomcat;
-	private String webappDirLocation;
-	private int port;
-	private static int DEFAULT_TOMCAT_PORT = 38080;
-	private static final Logger LOG = LoggerFactory.getLogger(EmbeddedServer.class);
-		
-	public static void main(String[] args) {
-        // args: webappLocation, port
-        int tomcatPort;
-        if (args.length > 1) {
-            tomcatPort = Integer.valueOf(args[1]);
-        } else {
-            tomcatPort = DEFAULT_TOMCAT_PORT;
-        }
-        new EmbeddedServer(args[0], tomcatPort).start();
-	}
-
-	private EmbeddedServer(String webappDirLocation) {
-		this(webappDirLocation, DEFAULT_TOMCAT_PORT);
-	}
-	
-	private EmbeddedServer(String webappDirLocation, int port) {
-		this.webappDirLocation = webappDirLocation;
-		this.port = port;
-	}
-	
-    public static EmbeddedServer getInstance(String webappDirLocation) {
-    	if (server == null) {
-    		synchronized(EmbeddedServer.class) {
-    			if (server == null) {
-    				server = new EmbeddedServer(webappDirLocation);
-    				server.start();   						
-    			}
-    		}
-    	}
-    	return server;
-    }
-
-	public int getPort() {
-		return port;
-	}
-	
-	public void start() {
-		tomcat = new Tomcat();
-		tomcat.setHostname("localhost");
-		tomcat.setPort(port);
-		try {
-			tomcat.addWebapp("/eagle-service", new File(webappDirLocation).getAbsolutePath());
-			tomcat.start();
-
-		} catch (Exception ex) {
-			LOG.error("Got an exception " + ex.getMessage());
-		}
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-            	try {            		
-            		shutdown();
-            	}
-            	catch (Throwable t) {
-            		LOG.error("Got an exception why shutting down..." + t.getMessage());
-            	}
-            }
-        });
-		try {
-			Thread.sleep(10000000);
-		}catch(Exception ex){
-			ex.printStackTrace();
-		}
-	}
-	
-	public void shutdown() throws Throwable {
-	  	if (tomcat.getServer() != null && tomcat.getServer().getState() != LifecycleState.DESTROYED) {
-	        if (tomcat.getServer().getState() != LifecycleState.STOPPED) {
-	        	tomcat.stop();
-	        }
-	        tomcat.destroy();
-	    }
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-server/src/main/java/org/apache/eagle/service/embedded/tomcat/EmbeddedServer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-server/src/main/java/org/apache/eagle/service/embedded/tomcat/EmbeddedServer.java b/eagle-core/eagle-embed/eagle-embed-server/src/main/java/org/apache/eagle/service/embedded/tomcat/EmbeddedServer.java
new file mode 100644
index 0000000..35d3967
--- /dev/null
+++ b/eagle-core/eagle-embed/eagle-embed-server/src/main/java/org/apache/eagle/service/embedded/tomcat/EmbeddedServer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.embedded.tomcat;
+
+import java.io.File;
+
+import org.apache.catalina.LifecycleState;
+import org.apache.catalina.startup.Tomcat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EmbeddedServer {
+	
+	private static EmbeddedServer server;
+	private Tomcat tomcat;
+	private String webappDirLocation;
+	private int port;
+	private static int DEFAULT_TOMCAT_PORT = 38080;
+	private static final Logger LOG = LoggerFactory.getLogger(EmbeddedServer.class);
+		
+	public static void main(String[] args) {
+        // args: webappLocation, port
+        int tomcatPort;
+        if (args.length > 1) {
+            tomcatPort = Integer.valueOf(args[1]);
+        } else {
+            tomcatPort = DEFAULT_TOMCAT_PORT;
+        }
+        new EmbeddedServer(args[0], tomcatPort).start();
+	}
+
+	private EmbeddedServer(String webappDirLocation) {
+		this(webappDirLocation, DEFAULT_TOMCAT_PORT);
+	}
+	
+	private EmbeddedServer(String webappDirLocation, int port) {
+		this.webappDirLocation = webappDirLocation;
+		this.port = port;
+	}
+	
+    public static EmbeddedServer getInstance(String webappDirLocation) {
+    	if (server == null) {
+    		synchronized(EmbeddedServer.class) {
+    			if (server == null) {
+    				server = new EmbeddedServer(webappDirLocation);
+    				server.start();   						
+    			}
+    		}
+    	}
+    	return server;
+    }
+
+	public int getPort() {
+		return port;
+	}
+	
+	public void start() {
+		tomcat = new Tomcat();
+		tomcat.setHostname("localhost");
+		tomcat.setPort(port);
+		try {
+			tomcat.addWebapp("/eagle-service", new File(webappDirLocation).getAbsolutePath());
+			tomcat.start();
+
+		} catch (Exception ex) {
+			LOG.error("Got an exception " + ex.getMessage());
+		}
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+            	try {            		
+            		shutdown();
+            	}
+            	catch (Throwable t) {
+            		LOG.error("Got an exception why shutting down..." + t.getMessage());
+            	}
+            }
+        });
+		try {
+			Thread.sleep(10000000);
+		}catch(Exception ex){
+			ex.printStackTrace();
+		}
+	}
+	
+	public void shutdown() throws Throwable {
+	  	if (tomcat.getServer() != null && tomcat.getServer().getState() != LifecycleState.DESTROYED) {
+	        if (tomcat.getServer().getState() != LifecycleState.STOPPED) {
+	        	tomcat.stop();
+	        }
+	        tomcat.destroy();
+	    }
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-server/src/test/java/eagle/service/embedded/tomcat/TestEmbeddedServer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-server/src/test/java/eagle/service/embedded/tomcat/TestEmbeddedServer.java b/eagle-core/eagle-embed/eagle-embed-server/src/test/java/eagle/service/embedded/tomcat/TestEmbeddedServer.java
deleted file mode 100644
index 6039d85..0000000
--- a/eagle-core/eagle-embed/eagle-embed-server/src/test/java/eagle/service/embedded/tomcat/TestEmbeddedServer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.service.embedded.tomcat;
-
-public class TestEmbeddedServer {
-	
-	//@Test
-	public void test() throws Throwable{
-		String webappDirLocation = "../../../eagle-webservice/target/eagle-service";
-		EmbeddedServer.getInstance(webappDirLocation);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-server/src/test/java/org/apache/eagle/service/embedded/tomcat/TestEmbeddedServer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-server/src/test/java/org/apache/eagle/service/embedded/tomcat/TestEmbeddedServer.java b/eagle-core/eagle-embed/eagle-embed-server/src/test/java/org/apache/eagle/service/embedded/tomcat/TestEmbeddedServer.java
new file mode 100644
index 0000000..1351ac7
--- /dev/null
+++ b/eagle-core/eagle-embed/eagle-embed-server/src/test/java/org/apache/eagle/service/embedded/tomcat/TestEmbeddedServer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.embedded.tomcat;
+
+public class TestEmbeddedServer {
+	
+	//@Test
+	public void test() throws Throwable{
+		String webappDirLocation = "../../../eagle-webservice/target/eagle-service";
+		EmbeddedServer.getInstance(webappDirLocation);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/eagle/ml/MLAlgorithmEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/eagle/ml/MLAlgorithmEvaluator.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/eagle/ml/MLAlgorithmEvaluator.java
deleted file mode 100644
index 4f10fc2..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/eagle/ml/MLAlgorithmEvaluator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.ml;
-
-import eagle.dataproc.core.ValuesArray;
-import eagle.ml.model.MLAlgorithm;
-import com.typesafe.config.Config;
-
-/**
- * Machine Learning Algorithm Evaluator
- */
-public interface MLAlgorithmEvaluator {
-    /**
-     * Prepare Machine learning algorithm
-     *
-     * @param algorithm MLAlgorithm instance
-     */
-    public void init(MLAlgorithm algorithm, Config config);
-
-    /**
-     * Evaluate input user profile model
-     *
-     * @param data ValuesArray
-     * @throws Exception
-     */
-	public void evaluate(ValuesArray data) throws Exception;
-
-    /**
-     * Register callback
-     *
-     * @param callbackObj MachineLearningCallback
-     * @throws Exception
-     */
-	public void register(MLAnomalyCallback callbackObj) throws Exception;
-}
\ No newline at end of file