You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:41 UTC
[34/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46]
Rename package name as "org.apache.eagle"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala
new file mode 100644
index 0000000..48dc7e5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStreamDAGBuilder.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{FlatSpec, Matchers}
+
+class TestStreamDAGBuilder extends FlatSpec with Matchers{
+// "a single source DAG with groupBy" should "be traversed without groupBy node" in {
+// val config = ConfigFactory.load()
+// val env = ExecutionEnvironmentFactory.getStorm(config)
+// val tail = env.newSource(null).flatMap(EchoExecutor()).groupBy(0).flatMap(WordPrependExecutor("test"))
+// val dag = new StreamDAGBuilder(env.heads).build()
+// val iter = dag.iterator()
+// assert(iter.hasNext)
+// iter.next() match{
+// case StormSourceProducer(t) => assert(t == null)
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match{
+// case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match{
+// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
+// case _ => assert(false)
+// }
+// }
+//
+// "a single source DAG with groupBy from spout" should "be traversed without groupBy node" in {
+// val config = ConfigFactory.load()
+// val env = ExecutionEnvironmentFactory.getStorm(config)
+// val tail = env.newSource(null).groupBy(0).flatMap(WordPrependExecutor("test"))
+// val dag = new StreamDAGBuilder(env.heads).build()
+// val iter = dag.iterator()
+// assert(iter.hasNext)
+// iter.next() match{
+// case StormSourceProducer(t) => assert(t == null)
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match{
+// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
+// case _ => assert(false)
+// }
+// }
+//
+// "a single source DAG with groupBy from spout and then split" should "be traversed without groupBy node" in {
+// val config = ConfigFactory.load()
+// val env = ExecutionEnvironmentFactory.getStorm(config)
+// val groupby = env.newSource(null).groupBy(0)
+// groupby.flatMap(WordPrependExecutor("test"))
+// groupby.flatMap(WordAppendExecutor("test"))
+// val dag = new StreamDAGBuilder(env.heads).build()
+// val iter = dag.iterator()
+// assert(iter.hasNext)
+// iter.next() match{
+// case StormSourceProducer(t) => assert(t == null)
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match{
+// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match{
+// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
+// case _ => assert(false)
+// }
+// }
+//
+// "a single source DAG without stream join" should "be traversed sequentially like specified" in{
+// val config = ConfigFactory.load()
+// val env = ExecutionEnvironmentFactory.getStorm(config)
+// val tail = env.newSource(null).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test"))
+// val dag = new StreamDAGBuilder(env.heads).build()
+// val iter = dag.iterator()
+// assert(iter.hasNext)
+// iter.next() match{
+// case StormSourceProducer(t) => assert(t == null)
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match{
+// case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match{
+// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
+// case _ => assert(false)
+// }
+// }
+//
+// "a single source with split" should "has more than one tail producer" in {
+// val config = ConfigFactory.load()
+// val env = ExecutionEnvironmentFactory.getStorm(config)
+// val echo = env.newSource(null).flatMap(EchoExecutor())
+// val tail1 = echo.flatMap(WordPrependExecutor("test"))
+// val tail2 = echo.flatMap(WordAppendExecutor("test"))
+// val dag = new StreamDAGBuilder(env.heads).build()
+// val iter = dag.iterator()
+// assert(iter.hasNext)
+// iter.next() match {
+// case StormSourceProducer(t) => assert(t == null)
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match {
+// case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match {
+// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match {
+// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
+// case _ => assert(false)
+// }
+// }
+//
+// "a single source with split and join" should "has join" in {
+// val config = ConfigFactory.load()
+// val env = ExecutionEnvironmentFactory.getStorm(config)
+// val echo = env.newSource(null).flatMap(EchoExecutor())
+// val tail1 = echo.flatMap(WordPrependExecutor("test"))
+// val tail2 = echo.flatMap(WordAppendExecutor("test")).filter(_=>true).streamUnion(List(tail1)).
+// flatMap(PatternAlertExecutor("test*"))
+// val dag = new StreamDAGBuilder(env.heads).build()
+// val iter = dag.iterator()
+// assert(iter.hasNext)
+// iter.next() match {
+// case StormSourceProducer(t) => assert(t == null)
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match {
+// case FlatMapProducer(worker) => assert(worker.isInstanceOf[EchoExecutor])
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match {
+// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match {
+// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match {
+// case FilterProducer(fn) =>
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match {
+// case FlatMapProducer(worker) => assert(worker.asInstanceOf[PatternAlertExecutor].pattern.equals("test*"))
+// case _ => assert(false)
+// }
+// assert(!iter.hasNext)
+// }
+//
+// "multiple sources with split and union" should "has union" in {
+// val config = ConfigFactory.load()
+// val env = ExecutionEnvironmentFactory.getStorm(config)
+// val source1 = env.newSource(TestSpout())
+// val source2 = env.newSource(TestSpout())
+// val source3 = env.newSource(TestSpout())
+//
+// val tail1 = source1.flatMap(WordPrependExecutor("test"))
+// val tail2 = source2.filter(_=>true)
+// val tail3 = source3.flatMap(WordAppendExecutor("test")).streamUnion(List(tail1, tail2)).
+// flatMap(PatternAlertExecutor("abc*"))
+//
+// val dag = new StreamDAGBuilder(env.heads).build()
+// val iter = dag.iterator()
+//
+// assert(iter.hasNext)
+// iter.next() match {
+// case StormSourceProducer(t) => assert(t.isInstanceOf[TestSpout])
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match {
+// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordPrependExecutor])
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match {
+// case StormSourceProducer(t) => assert(t.isInstanceOf[String])
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match {
+// case FilterProducer(fn) =>
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match {
+// case SourceProducer(t) => assert(t.isInstanceOf[String])
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match {
+// case FlatMapProducer(worker) => assert(worker.isInstanceOf[WordAppendExecutor])
+// case _ => assert(false)
+// }
+// assert(iter.hasNext)
+// iter.next() match {
+// case FlatMapProducer(worker) => assert(worker.asInstanceOf[PatternAlertExecutor].pattern.equals("abc*"))
+// case _ => assert(false)
+// }
+// assert(!iter.hasNext)
+// }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/EagleOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/EagleOutputCollector.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/EagleOutputCollector.java
deleted file mode 100644
index ef4e0cb..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/EagleOutputCollector.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.core;
-
-import java.io.Serializable;
-
-/**
- * expose simple interface for streaming executor to populate output data
- *
- */
-public interface EagleOutputCollector extends Serializable{
- void collect(ValuesArray t);
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/JsonSerDeserUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/JsonSerDeserUtils.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/JsonSerDeserUtils.java
deleted file mode 100644
index 69dff5d..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/JsonSerDeserUtils.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.core;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-public class JsonSerDeserUtils {
- private static final Logger LOG = LoggerFactory.getLogger(JsonSerDeserUtils.class);
-
- public static <T> String serialize(T o) throws Exception{
- return serialize(o, null);
- }
-
- public static <T> String serialize(T o, List<Module> modules) throws Exception {
- ObjectMapper mapper = new ObjectMapper();
- if (modules != null) {
- mapper.registerModules(modules);
- }
- return mapper.writeValueAsString(o);
- }
-
- public static <T> T deserialize(String value, Class<T> cls) throws Exception{
- return deserialize(value, cls, null);
- }
-
- public static <T> T deserialize(String value, Class<T> cls, List<Module> modules) throws Exception{
- ObjectMapper mapper = new ObjectMapper();
- if (modules != null) {
- mapper.registerModules(modules);
- }
- return mapper.readValue(value, cls);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/StreamingProcessConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/StreamingProcessConstants.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/StreamingProcessConstants.java
deleted file mode 100644
index ab45fbb..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/StreamingProcessConstants.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.core;
-
-public class StreamingProcessConstants {
- public static final String EVENT_PARTITION_KEY = "eventPartitionKey";
- public static final String EVENT_STREAM_NAME = "streamName";
- public static final String EVENT_ATTRIBUTE_MAP = "value";
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/ValuesArray.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/ValuesArray.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/ValuesArray.java
deleted file mode 100644
index b5c26bb..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/core/ValuesArray.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.core;
-
-import java.util.ArrayList;
-
-/**
- * multiple datapoints are stored within one ValuesArray object
- * sent out
- */
-public class ValuesArray extends ArrayList<Object>{
- private static final long serialVersionUID = -8218427810421668178L;
-
- public ValuesArray() {
-
- }
-
- public ValuesArray(Object... vals) {
- super(vals.length);
- for(Object o: vals) {
- add(o);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/AbstractConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/AbstractConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/AbstractConfigOptionParser.java
deleted file mode 100644
index a660430..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/AbstractConfigOptionParser.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.util;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.commons.cli.*;
-
-import java.util.Map;
-
-/**
- * @since 8/22/15
- */
-public abstract class AbstractConfigOptionParser {
-
- private final Options options;
- private final Parser parser;
-
- public AbstractConfigOptionParser(){
- parser = parser();
- options = options();
- }
-
- /**
- * @return Parser
- */
- protected abstract Parser parser();
-
- /**
- * @return Options
- */
- protected abstract Options options();
-
- public abstract Map<String,String> parseConfig(String[] arguments) throws ParseException;
-
- /**
- * Load config as system properties
- *
- * @param arguments command line arguments
- * @throws ParseException
- */
- public Config load(String[] arguments) throws ParseException {
- Map<String,String> configProps = parseConfig(arguments);
- for(Map.Entry<String,String> entry:configProps.entrySet()){
- System.setProperty(entry.getKey(),entry.getValue());
- }
- System.setProperty("config.trace", "loads");
- return ConfigFactory.load();
- }
-
- public CommandLine parse(String[] arguments) throws ParseException {
- return this.parser.parse(this.options,arguments);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/ConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/ConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/ConfigOptionParser.java
deleted file mode 100644
index a217bfb..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/eagle/dataproc/util/ConfigOptionParser.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.dataproc.util;
-
-import org.apache.commons.cli.*;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-public class ConfigOptionParser extends AbstractConfigOptionParser {
- private final static String CONFIG_OPT_FLAG = "D";
-
- @Override
- protected Parser parser() {
- return new BasicParser();
- }
-
- @Override
- protected Options options() {
- Options options = new Options();
- options.addOption(CONFIG_OPT_FLAG, true, "Config properties in format of \"-D key=value\"");
- return options;
- }
-
- @Override
- public Map<String,String> parseConfig(String[] arguments) throws ParseException {
- CommandLine cmd = parse(arguments);
-
- Map<String,String> result = new HashMap<>();
- if(cmd.hasOption(CONFIG_OPT_FLAG)){
- String[] values = cmd.getOptionValues(CONFIG_OPT_FLAG);
- for(String value:values){
- int eqIndex = value.indexOf("=");
- if(eqIndex>0 && eqIndex<value.length()){
- String k = value.substring(0,eqIndex);
- String v = value.substring(eqIndex+1,value.length());
- if(result.containsKey(k)){
- throw new ParseException("Duplicated "+CONFIG_OPT_FLAG+" "+value);
- }else{
- result.put(k,v);
- }
- }else{
- throw new ParseException("Invalid format: -"+CONFIG_OPT_FLAG+" "+value+", required: -"+CONFIG_OPT_FLAG+" key=value");
- }
- }
- }
- return result;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java
new file mode 100644
index 0000000..0c4b1ab
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/EagleOutputCollector.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.core;
+
+import java.io.Serializable;
+
+/**
+ * expose simple interface for streaming executor to populate output data
+ *
+ */
+public interface EagleOutputCollector extends Serializable{
+ void collect(ValuesArray t);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java
new file mode 100644
index 0000000..6933a5f
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.core;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class JsonSerDeserUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(JsonSerDeserUtils.class);
+
+ public static <T> String serialize(T o) throws Exception{
+ return serialize(o, null);
+ }
+
+ public static <T> String serialize(T o, List<Module> modules) throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ if (modules != null) {
+ mapper.registerModules(modules);
+ }
+ return mapper.writeValueAsString(o);
+ }
+
+ public static <T> T deserialize(String value, Class<T> cls) throws Exception{
+ return deserialize(value, cls, null);
+ }
+
+ public static <T> T deserialize(String value, Class<T> cls, List<Module> modules) throws Exception{
+ ObjectMapper mapper = new ObjectMapper();
+ if (modules != null) {
+ mapper.registerModules(modules);
+ }
+ return mapper.readValue(value, cls);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java
new file mode 100644
index 0000000..fc2a016
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/StreamingProcessConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.core;
+
+public class StreamingProcessConstants {
+ public static final String EVENT_PARTITION_KEY = "eventPartitionKey";
+ public static final String EVENT_STREAM_NAME = "streamName";
+ public static final String EVENT_ATTRIBUTE_MAP = "value";
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java
new file mode 100644
index 0000000..9971cb2
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/ValuesArray.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.core;
+
+import java.util.ArrayList;
+
+/**
+ * multiple datapoints are stored within one ValuesArray object
+ * sent out
+ */
+public class ValuesArray extends ArrayList<Object>{
+ private static final long serialVersionUID = -8218427810421668178L;
+
+ public ValuesArray() {
+
+ }
+
+ public ValuesArray(Object... vals) {
+ super(vals.length);
+ for(Object o: vals) {
+ add(o);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java
new file mode 100644
index 0000000..19853ef
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.util;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.commons.cli.*;
+
+import java.util.Map;
+
+/**
+ * @since 8/22/15
+ */
+public abstract class AbstractConfigOptionParser {
+
+ private final Options options;
+ private final Parser parser;
+
+ public AbstractConfigOptionParser(){
+ parser = parser();
+ options = options();
+ }
+
+ /**
+ * @return Parser
+ */
+ protected abstract Parser parser();
+
+ /**
+ * @return Options
+ */
+ protected abstract Options options();
+
+ public abstract Map<String,String> parseConfig(String[] arguments) throws ParseException;
+
+ /**
+ * Load config as system properties
+ *
+ * @param arguments command line arguments
+ * @throws ParseException
+ */
+ public Config load(String[] arguments) throws ParseException {
+ Map<String,String> configProps = parseConfig(arguments);
+ for(Map.Entry<String,String> entry:configProps.entrySet()){
+ System.setProperty(entry.getKey(),entry.getValue());
+ }
+ System.setProperty("config.trace", "loads");
+ return ConfigFactory.load();
+ }
+
+ public CommandLine parse(String[] arguments) throws ParseException {
+ return this.parser.parse(this.options,arguments);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java
new file mode 100644
index 0000000..0bd7559
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.util;
+
+import org.apache.commons.cli.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class ConfigOptionParser extends AbstractConfigOptionParser {
+ private final static String CONFIG_OPT_FLAG = "D";
+
+ @Override
+ protected Parser parser() {
+ return new BasicParser();
+ }
+
+ @Override
+ protected Options options() {
+ Options options = new Options();
+ options.addOption(CONFIG_OPT_FLAG, true, "Config properties in format of \"-D key=value\"");
+ return options;
+ }
+
+ @Override
+ public Map<String,String> parseConfig(String[] arguments) throws ParseException {
+ CommandLine cmd = parse(arguments);
+
+ Map<String,String> result = new HashMap<>();
+ if(cmd.hasOption(CONFIG_OPT_FLAG)){
+ String[] values = cmd.getOptionValues(CONFIG_OPT_FLAG);
+ for(String value:values){
+ int eqIndex = value.indexOf("=");
+ if(eqIndex>0 && eqIndex<value.length()){
+ String k = value.substring(0,eqIndex);
+ String v = value.substring(eqIndex+1,value.length());
+ if(result.containsKey(k)){
+ throw new ParseException("Duplicated "+CONFIG_OPT_FLAG+" "+value);
+ }else{
+ result.put(k,v);
+ }
+ }else{
+ throw new ParseException("Invalid format: -"+CONFIG_OPT_FLAG+" "+value+", required: -"+CONFIG_OPT_FLAG+" key=value");
+ }
+ }
+ }
+ return result;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/Collector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/Collector.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/Collector.scala
deleted file mode 100644
index bbb5f3f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/Collector.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-trait Collector[R] {
- def collect(r : R);
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/EagleTuple.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/EagleTuple.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/EagleTuple.scala
deleted file mode 100644
index 0225329..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/EagleTuple.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-trait EagleTuple extends Serializable{
- def getList : List[AnyRef]
-}
-
-case class Tuple1[T0](f0 : T0) extends EagleTuple{
- override def getList : List[AnyRef] = {
- return List(f0.asInstanceOf[AnyRef])
- }
-}
-
-case class Tuple2[T0, T1](f0 : T0, f1: T1) extends EagleTuple{
- override def getList : List[AnyRef] = {
- return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef])
- }
-}
-
-case class Tuple3[T0, T1, T2](f0 : T0, f1: T1, f2: T2) extends EagleTuple{
- override def getList : List[AnyRef] = {
- return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef])
- }
-}
-
-case class Tuple4[T0, T1, T2, T3](f0 : T0, f1: T1, f2: T2, f3 : T3) extends EagleTuple{
- override def getList : List[AnyRef] = {
- return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef], f3.asInstanceOf[AnyRef])
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/FlatMapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/FlatMapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/FlatMapper.scala
deleted file mode 100644
index 488ba9f..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/FlatMapper.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-trait FlatMapper[T, R] extends Serializable{
- def flatMap(input : T, collector : Collector[R])
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/StormStreamExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/StormStreamExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/StormStreamExecutor.scala
deleted file mode 100644
index 73b7a3e..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/eagle/datastream/StormStreamExecutor.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.datastream
-
-import com.typesafe.config.Config
-
-trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef], R] {
- def prepareConfig(config : Config)
- def init
- def fields : Array[String]
-}
-
-trait JavaStormStreamExecutor[R <: EagleTuple] extends FlatMapper[java.util.List[AnyRef], R] {
- def prepareConfig(config : Config)
- def init
- def fields : Array[String]
- override def toString() = this.getClass.getSimpleName
-}
-
-abstract class StormStreamExecutor1[T0] extends StormStreamExecutor[Tuple1[T0]] {
- override def fields = Array("f0")
-}
-
-abstract class JavaStormStreamExecutor1[T0] extends JavaStormStreamExecutor[Tuple1[T0]] {
- override def fields = Array("f0")
-}
-
-abstract class StormStreamExecutor2[T0, T1] extends StormStreamExecutor[Tuple2[T0, T1]] {
- override def fields = Array("f0", "f1")
-}
-
-abstract class JavaStormStreamExecutor2[T0, T1] extends JavaStormStreamExecutor[Tuple2[T0, T1]] {
- override def fields = Array("f0", "f1")
-}
-
-abstract class StormStreamExecutor3[T0, T1, T2] extends StormStreamExecutor[Tuple3[T0, T1, T2]] {
- override def fields = Array("f0", "f1", "f2")
-}
-
-abstract class JavaStormStreamExecutor3[T0, T1, T2] extends JavaStormStreamExecutor[Tuple3[T0, T1, T2]] {
- override def fields = Array("f0", "f1", "f2")
-}
-
-abstract class StormStreamExecutor4[T0, T1, T2, T3] extends StormStreamExecutor[Tuple4[T0, T1, T2, T3]] {
- override def fields = Array("f0", "f1", "f2", "f3")
-}
-
-abstract class JavaStormStreamExecutor4[T0, T1, T2, T3] extends JavaStormStreamExecutor[Tuple4[T0, T1, T2, T3]] {
- override def fields = Array("f0", "f1", "f2", "f3")
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/Collector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/Collector.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/Collector.scala
new file mode 100644
index 0000000..2532063
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/Collector.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+trait Collector[R] {
+ def collect(r : R);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
new file mode 100644
index 0000000..5c89a41
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+trait EagleTuple extends Serializable{
+ def getList : List[AnyRef]
+}
+
+case class Tuple1[T0](f0 : T0) extends EagleTuple{
+ override def getList : List[AnyRef] = {
+ return List(f0.asInstanceOf[AnyRef])
+ }
+}
+
+case class Tuple2[T0, T1](f0 : T0, f1: T1) extends EagleTuple{
+ override def getList : List[AnyRef] = {
+ return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef])
+ }
+}
+
+case class Tuple3[T0, T1, T2](f0 : T0, f1: T1, f2: T2) extends EagleTuple{
+ override def getList : List[AnyRef] = {
+ return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef])
+ }
+}
+
+case class Tuple4[T0, T1, T2, T3](f0 : T0, f1: T1, f2: T2, f3 : T3) extends EagleTuple{
+ override def getList : List[AnyRef] = {
+ return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef], f3.asInstanceOf[AnyRef])
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
new file mode 100644
index 0000000..22e063d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+trait FlatMapper[T, R] extends Serializable{
+ def flatMap(input : T, collector : Collector[R])
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
new file mode 100644
index 0000000..b3d77e9
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import com.typesafe.config.Config
+
+trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef], R] {
+ def prepareConfig(config : Config)
+ def init
+ def fields : Array[String]
+}
+
+trait JavaStormStreamExecutor[R <: EagleTuple] extends FlatMapper[java.util.List[AnyRef], R] {
+ def prepareConfig(config : Config)
+ def init
+ def fields : Array[String]
+ override def toString() = this.getClass.getSimpleName
+}
+
+abstract class StormStreamExecutor1[T0] extends StormStreamExecutor[Tuple1[T0]] {
+ override def fields = Array("f0")
+}
+
+abstract class JavaStormStreamExecutor1[T0] extends JavaStormStreamExecutor[Tuple1[T0]] {
+ override def fields = Array("f0")
+}
+
+abstract class StormStreamExecutor2[T0, T1] extends StormStreamExecutor[Tuple2[T0, T1]] {
+ override def fields = Array("f0", "f1")
+}
+
+abstract class JavaStormStreamExecutor2[T0, T1] extends JavaStormStreamExecutor[Tuple2[T0, T1]] {
+ override def fields = Array("f0", "f1")
+}
+
+abstract class StormStreamExecutor3[T0, T1, T2] extends StormStreamExecutor[Tuple3[T0, T1, T2]] {
+ override def fields = Array("f0", "f1", "f2")
+}
+
+abstract class JavaStormStreamExecutor3[T0, T1, T2] extends JavaStormStreamExecutor[Tuple3[T0, T1, T2]] {
+ override def fields = Array("f0", "f1", "f2")
+}
+
+abstract class StormStreamExecutor4[T0, T1, T2, T3] extends StormStreamExecutor[Tuple4[T0, T1, T2, T3]] {
+ override def fields = Array("f0", "f1", "f2", "f3")
+}
+
+abstract class JavaStormStreamExecutor4[T0, T1, T2, T3] extends JavaStormStreamExecutor[Tuple4[T0, T1, T2, T3]] {
+ override def fields = Array("f0", "f1", "f2", "f3")
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/EmbeddedHbase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/EmbeddedHbase.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/EmbeddedHbase.java
deleted file mode 100644
index 980d410..0000000
--- a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/EmbeddedHbase.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.service.hbase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class EmbeddedHbase {
- private HBaseTestingUtility util;
- private MiniHBaseCluster hBaseCluster;
- private static EmbeddedHbase hbase;
- private int port;
- private String znode;
- private static int DEFAULT_PORT = 2181;
- private static String DEFAULT_ZNODE = "/hbase-unsecure";
- private static final Logger LOG = LoggerFactory.getLogger(EmbeddedHbase.class);
-
- private EmbeddedHbase(int port, String znode) {
- this.port = port;
- this.znode = znode;
- }
-
- private EmbeddedHbase(int port) {
- this(port, DEFAULT_ZNODE);
- }
-
- public static EmbeddedHbase getInstance() {
- if (hbase == null) {
- synchronized(EmbeddedHbase.class) {
- if (hbase == null) {
- hbase = new EmbeddedHbase();
- hbase.start();
- }
- }
- }
- return hbase;
- }
-
- private EmbeddedHbase() {
- this(DEFAULT_PORT, DEFAULT_ZNODE);
- }
-
- public void start() {
- try {
- util = new HBaseTestingUtility();
- Configuration conf= util.getConfiguration();
- conf.setInt("test.hbase.zookeeper.property.clientPort", port);
- conf.set("zookeeper.znode.parent", znode);
- conf.setInt("hbase.zookeeper.property.maxClientCnxns", 200);
- conf.setInt("hbase.master.info.port", -1);//avoid port clobbering
- // start mini hbase cluster
- hBaseCluster = util.startMiniCluster();
- Configuration config = hBaseCluster.getConf();
-
- config.set("zookeeper.session.timeout", "120000");
- config.set("hbase.zookeeper.property.tickTime", "6000");
- config.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
- config.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "1");
- config.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- shutdown();
- }
- });
- }
- catch (Throwable t) {
- LOG.error("Got an exception: ",t);
- }
- }
-
- public void shutdown() {
- try {
- util.shutdownMiniCluster();
- }
- catch (Throwable t) {
- LOG.info("Got an exception, " + t , t.getCause());
- try {
- util.shutdownMiniCluster();
- }
- catch (Throwable t1) {
- }
- }
- }
-
- public void createTable(String tableName, String cf) {
- try {
- util.createTable(tableName, cf);
- }
- catch (Exception ex) {
- LOG.warn("Create table failed, probably table already existed, table name: " + tableName);
- }
- }
-
- public void deleteTable(String tableName){
- try {
- util.deleteTable(tableName);
- }
- catch (Exception ex) {
- LOG.warn("Delete table failed, probably table not existed, table name: " + tableName);
- }
- }
-
- public static void main(String[] args){
- EmbeddedHbase hbase = new EmbeddedHbase();
- hbase.start();
- for(String table : new Tables().getTables()){
- hbase.createTable(table, "f");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/Tables.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/Tables.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/Tables.java
deleted file mode 100644
index 43bdbc6..0000000
--- a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/eagle/service/hbase/Tables.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.service.hbase;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class Tables {
- List<String> tables = new ArrayList<String>();
- public Tables(){
- tables.add("eagle_metric");
-
- tables.add("actiondetail");
- tables.add("alertdetail");
- tables.add("alertgroup");
- tables.add("alertmeta");
- tables.add("alertMetaEntity");
-
- // for alert framework
- tables.add("alertDataSource");
- tables.add("alertStream");
- tables.add("alertExecutor");
- tables.add("alertStreamSchema");
- tables.add("alertdef");
-
- // for security
- tables.add("hiveResourceSensitivity");
- tables.add("fileSensitivity");
- tables.add("mlmodel");
- tables.add("userprofile");
- }
-
- public List<String> getTables(){
- return this.tables;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java
new file mode 100644
index 0000000..c204fac
--- /dev/null
+++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/EmbeddedHbase.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class EmbeddedHbase {
+ private HBaseTestingUtility util;
+ private MiniHBaseCluster hBaseCluster;
+ private static EmbeddedHbase hbase;
+ private int port;
+ private String znode;
+ private static int DEFAULT_PORT = 2181;
+ private static String DEFAULT_ZNODE = "/hbase-unsecure";
+ private static final Logger LOG = LoggerFactory.getLogger(EmbeddedHbase.class);
+
+ private EmbeddedHbase(int port, String znode) {
+ this.port = port;
+ this.znode = znode;
+ }
+
+ private EmbeddedHbase(int port) {
+ this(port, DEFAULT_ZNODE);
+ }
+
+ public static EmbeddedHbase getInstance() {
+ if (hbase == null) {
+ synchronized(EmbeddedHbase.class) {
+ if (hbase == null) {
+ hbase = new EmbeddedHbase();
+ hbase.start();
+ }
+ }
+ }
+ return hbase;
+ }
+
+ private EmbeddedHbase() {
+ this(DEFAULT_PORT, DEFAULT_ZNODE);
+ }
+
+ public void start() {
+ try {
+ util = new HBaseTestingUtility();
+ Configuration conf= util.getConfiguration();
+ conf.setInt("test.hbase.zookeeper.property.clientPort", port);
+ conf.set("zookeeper.znode.parent", znode);
+ conf.setInt("hbase.zookeeper.property.maxClientCnxns", 200);
+ conf.setInt("hbase.master.info.port", -1);//avoid port clobbering
+ // start mini hbase cluster
+ hBaseCluster = util.startMiniCluster();
+ Configuration config = hBaseCluster.getConf();
+
+ config.set("zookeeper.session.timeout", "120000");
+ config.set("hbase.zookeeper.property.tickTime", "6000");
+ config.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
+ config.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "1");
+ config.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ shutdown();
+ }
+ });
+ }
+ catch (Throwable t) {
+ LOG.error("Got an exception: ",t);
+ }
+ }
+
+ public void shutdown() {
+ try {
+ util.shutdownMiniCluster();
+ }
+ catch (Throwable t) {
+ LOG.info("Got an exception, " + t , t.getCause());
+ try {
+ util.shutdownMiniCluster();
+ }
+ catch (Throwable t1) {
+ }
+ }
+ }
+
+ public void createTable(String tableName, String cf) {
+ try {
+ util.createTable(tableName, cf);
+ }
+ catch (Exception ex) {
+ LOG.warn("Create table failed, probably table already existed, table name: " + tableName);
+ }
+ }
+
+ public void deleteTable(String tableName){
+ try {
+ util.deleteTable(tableName);
+ }
+ catch (Exception ex) {
+ LOG.warn("Delete table failed, probably table not existed, table name: " + tableName);
+ }
+ }
+
+ public static void main(String[] args){
+ EmbeddedHbase hbase = new EmbeddedHbase();
+ hbase.start();
+ for(String table : new Tables().getTables()){
+ hbase.createTable(table, "f");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
new file mode 100644
index 0000000..a31e27e
--- /dev/null
+++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/main/java/org/apache/eagle/service/hbase/Tables.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.hbase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Tables {
+ List<String> tables = new ArrayList<String>();
+ public Tables(){
+ tables.add("eagle_metric");
+
+ tables.add("actiondetail");
+ tables.add("alertdetail");
+ tables.add("alertgroup");
+ tables.add("alertmeta");
+ tables.add("alertMetaEntity");
+
+ // for alert framework
+ tables.add("alertDataSource");
+ tables.add("alertStream");
+ tables.add("alertExecutor");
+ tables.add("alertStreamSchema");
+ tables.add("alertdef");
+
+ // for security
+ tables.add("hiveResourceSensitivity");
+ tables.add("fileSensitivity");
+ tables.add("mlmodel");
+ tables.add("userprofile");
+ }
+
+ public List<String> getTables(){
+ return this.tables;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/eagle/service/hbase/TestHBaseBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/eagle/service/hbase/TestHBaseBase.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/eagle/service/hbase/TestHBaseBase.java
deleted file mode 100644
index fc15f70..0000000
--- a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/eagle/service/hbase/TestHBaseBase.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.service.hbase;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestHBaseBase {
- protected static EmbeddedHbase hbase;
-
- @BeforeClass
- public static void setUpHBase() {
- hbase = EmbeddedHbase.getInstance();
- }
-
- @Test
- public void test() {
-
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java
new file mode 100644
index 0000000..87fffcd
--- /dev/null
+++ b/eagle-core/eagle-embed/eagle-embed-hbase/src/test/java/org/apache/eagle/service/hbase/TestHBaseBase.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.hbase;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHBaseBase {
+ protected static EmbeddedHbase hbase;
+
+ @BeforeClass
+ public static void setUpHBase() {
+ hbase = EmbeddedHbase.getInstance();
+ }
+
+ @Test
+ public void test() {
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-server/runEmbeddedServer.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-server/runEmbeddedServer.sh b/eagle-core/eagle-embed/eagle-embed-server/runEmbeddedServer.sh
index 39fd45a..c3192c2 100755
--- a/eagle-core/eagle-embed/eagle-embed-server/runEmbeddedServer.sh
+++ b/eagle-core/eagle-embed/eagle-embed-server/runEmbeddedServer.sh
@@ -1,4 +1,4 @@
#!/bin/bash
export JAVA_HOME=$(/usr/libexec/java_home -v 1.7)
-mvn -X exec:java -Dexec.mainClass="eagle.service.embedded.tomcat.EmbeddedServer" -Dexec.args="../../../eagle-security/eagle-security-webservice/target/eagle-service 38080"
\ No newline at end of file
+mvn -X exec:java -Dexec.mainClass="org.apache.eagle.service.embedded.tomcat.EmbeddedServer" -Dexec.args="../../../eagle-security/eagle-security-webservice/target/eagle-service 38080"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-server/src/main/java/eagle/service/embedded/tomcat/EmbeddedServer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-server/src/main/java/eagle/service/embedded/tomcat/EmbeddedServer.java b/eagle-core/eagle-embed/eagle-embed-server/src/main/java/eagle/service/embedded/tomcat/EmbeddedServer.java
deleted file mode 100644
index 02e7988..0000000
--- a/eagle-core/eagle-embed/eagle-embed-server/src/main/java/eagle/service/embedded/tomcat/EmbeddedServer.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.service.embedded.tomcat;
-
-import java.io.File;
-
-import org.apache.catalina.LifecycleState;
-import org.apache.catalina.startup.Tomcat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EmbeddedServer {
-
- private static EmbeddedServer server;
- private Tomcat tomcat;
- private String webappDirLocation;
- private int port;
- private static int DEFAULT_TOMCAT_PORT = 38080;
- private static final Logger LOG = LoggerFactory.getLogger(EmbeddedServer.class);
-
- public static void main(String[] args) {
- // args: webappLocation, port
- int tomcatPort;
- if (args.length > 1) {
- tomcatPort = Integer.valueOf(args[1]);
- } else {
- tomcatPort = DEFAULT_TOMCAT_PORT;
- }
- new EmbeddedServer(args[0], tomcatPort).start();
- }
-
- private EmbeddedServer(String webappDirLocation) {
- this(webappDirLocation, DEFAULT_TOMCAT_PORT);
- }
-
- private EmbeddedServer(String webappDirLocation, int port) {
- this.webappDirLocation = webappDirLocation;
- this.port = port;
- }
-
- public static EmbeddedServer getInstance(String webappDirLocation) {
- if (server == null) {
- synchronized(EmbeddedServer.class) {
- if (server == null) {
- server = new EmbeddedServer(webappDirLocation);
- server.start();
- }
- }
- }
- return server;
- }
-
- public int getPort() {
- return port;
- }
-
- public void start() {
- tomcat = new Tomcat();
- tomcat.setHostname("localhost");
- tomcat.setPort(port);
- try {
- tomcat.addWebapp("/eagle-service", new File(webappDirLocation).getAbsolutePath());
- tomcat.start();
-
- } catch (Exception ex) {
- LOG.error("Got an exception " + ex.getMessage());
- }
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- try {
- shutdown();
- }
- catch (Throwable t) {
- LOG.error("Got an exception why shutting down..." + t.getMessage());
- }
- }
- });
- try {
- Thread.sleep(10000000);
- }catch(Exception ex){
- ex.printStackTrace();
- }
- }
-
- public void shutdown() throws Throwable {
- if (tomcat.getServer() != null && tomcat.getServer().getState() != LifecycleState.DESTROYED) {
- if (tomcat.getServer().getState() != LifecycleState.STOPPED) {
- tomcat.stop();
- }
- tomcat.destroy();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-server/src/main/java/org/apache/eagle/service/embedded/tomcat/EmbeddedServer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-server/src/main/java/org/apache/eagle/service/embedded/tomcat/EmbeddedServer.java b/eagle-core/eagle-embed/eagle-embed-server/src/main/java/org/apache/eagle/service/embedded/tomcat/EmbeddedServer.java
new file mode 100644
index 0000000..35d3967
--- /dev/null
+++ b/eagle-core/eagle-embed/eagle-embed-server/src/main/java/org/apache/eagle/service/embedded/tomcat/EmbeddedServer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.embedded.tomcat;
+
+import java.io.File;
+
+import org.apache.catalina.LifecycleState;
+import org.apache.catalina.startup.Tomcat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EmbeddedServer {
+
+ private static EmbeddedServer server;
+ private Tomcat tomcat;
+ private String webappDirLocation;
+ private int port;
+ private static int DEFAULT_TOMCAT_PORT = 38080;
+ private static final Logger LOG = LoggerFactory.getLogger(EmbeddedServer.class);
+
+ public static void main(String[] args) {
+ // args: webappLocation, port
+ int tomcatPort;
+ if (args.length > 1) {
+ tomcatPort = Integer.valueOf(args[1]);
+ } else {
+ tomcatPort = DEFAULT_TOMCAT_PORT;
+ }
+ new EmbeddedServer(args[0], tomcatPort).start();
+ }
+
+ private EmbeddedServer(String webappDirLocation) {
+ this(webappDirLocation, DEFAULT_TOMCAT_PORT);
+ }
+
+ private EmbeddedServer(String webappDirLocation, int port) {
+ this.webappDirLocation = webappDirLocation;
+ this.port = port;
+ }
+
+ public static EmbeddedServer getInstance(String webappDirLocation) {
+ if (server == null) {
+ synchronized(EmbeddedServer.class) {
+ if (server == null) {
+ server = new EmbeddedServer(webappDirLocation);
+ server.start();
+ }
+ }
+ }
+ return server;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void start() {
+ tomcat = new Tomcat();
+ tomcat.setHostname("localhost");
+ tomcat.setPort(port);
+ try {
+ tomcat.addWebapp("/eagle-service", new File(webappDirLocation).getAbsolutePath());
+ tomcat.start();
+
+ } catch (Exception ex) {
+ LOG.error("Got an exception " + ex.getMessage());
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ shutdown();
+ }
+ catch (Throwable t) {
+ LOG.error("Got an exception why shutting down..." + t.getMessage());
+ }
+ }
+ });
+ try {
+ Thread.sleep(10000000);
+ }catch(Exception ex){
+ ex.printStackTrace();
+ }
+ }
+
+ public void shutdown() throws Throwable {
+ if (tomcat.getServer() != null && tomcat.getServer().getState() != LifecycleState.DESTROYED) {
+ if (tomcat.getServer().getState() != LifecycleState.STOPPED) {
+ tomcat.stop();
+ }
+ tomcat.destroy();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-server/src/test/java/eagle/service/embedded/tomcat/TestEmbeddedServer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-server/src/test/java/eagle/service/embedded/tomcat/TestEmbeddedServer.java b/eagle-core/eagle-embed/eagle-embed-server/src/test/java/eagle/service/embedded/tomcat/TestEmbeddedServer.java
deleted file mode 100644
index 6039d85..0000000
--- a/eagle-core/eagle-embed/eagle-embed-server/src/test/java/eagle/service/embedded/tomcat/TestEmbeddedServer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.service.embedded.tomcat;
-
-public class TestEmbeddedServer {
-
- //@Test
- public void test() throws Throwable{
- String webappDirLocation = "../../../eagle-webservice/target/eagle-service";
- EmbeddedServer.getInstance(webappDirLocation);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-embed/eagle-embed-server/src/test/java/org/apache/eagle/service/embedded/tomcat/TestEmbeddedServer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-embed/eagle-embed-server/src/test/java/org/apache/eagle/service/embedded/tomcat/TestEmbeddedServer.java b/eagle-core/eagle-embed/eagle-embed-server/src/test/java/org/apache/eagle/service/embedded/tomcat/TestEmbeddedServer.java
new file mode 100644
index 0000000..1351ac7
--- /dev/null
+++ b/eagle-core/eagle-embed/eagle-embed-server/src/test/java/org/apache/eagle/service/embedded/tomcat/TestEmbeddedServer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.embedded.tomcat;
+
+public class TestEmbeddedServer {
+
+ //@Test
+ public void test() throws Throwable{
+ String webappDirLocation = "../../../eagle-webservice/target/eagle-service";
+ EmbeddedServer.getInstance(webappDirLocation);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/eagle/ml/MLAlgorithmEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/eagle/ml/MLAlgorithmEvaluator.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/eagle/ml/MLAlgorithmEvaluator.java
deleted file mode 100644
index 4f10fc2..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/eagle/ml/MLAlgorithmEvaluator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.ml;
-
-import eagle.dataproc.core.ValuesArray;
-import eagle.ml.model.MLAlgorithm;
-import com.typesafe.config.Config;
-
-/**
- * Machine Learning Algorithm Evaluator
- */
-public interface MLAlgorithmEvaluator {
- /**
- * Prepare Machine learning algorithm
- *
- * @param algorithm MLAlgorithm instance
- */
- public void init(MLAlgorithm algorithm, Config config);
-
- /**
- * Evaluate input user profile model
- *
- * @param data ValuesArray
- * @throws Exception
- */
- public void evaluate(ValuesArray data) throws Exception;
-
- /**
- * Register callback
- *
- * @param callbackObj MachineLearningCallback
- * @throws Exception
- */
- public void register(MLAnomalyCallback callbackObj) throws Exception;
-}
\ No newline at end of file