You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/12/16 07:01:44 UTC
[2/5] incubator-eagle git commit: EAGLE-66 Typesafe Streaming DSL and
KeyValue based Grouping
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NameConstants.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NameConstants.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NameConstants.scala
new file mode 100644
index 0000000..d4836e4
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NameConstants.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.utils
+
+object NameConstants {
+ val FIELD_PREFIX = "f"
+ val FIELD_KEY = "key"
+ val FIELD_VALUE = "value"
+ val FIELD_SEPARATOR = "_"
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala
new file mode 100644
index 0000000..331cf7c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/NodeNameSelector.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.utils
+
+import org.apache.eagle.datastream.core.StreamInfo
+
+case class NodeNameSelector(producer : StreamInfo) {
+ def getName : String = {
+ producer.name match {
+ case null => producer.toString+NameConstants.FIELD_SEPARATOR+producer.id
+ case _ => producer.name
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala
new file mode 100644
index 0000000..1d48752
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.datastream.utils
+
+import scala.reflect.api
+import scala.reflect.runtime.{universe => ru}
+
+/**
+ * @since 12/7/15
+ */
+object Reflections{
+ private val UNIT_CLASS = classOf[Unit]
+ private val UNIT_TYPE_TAG = ru.typeTag[Unit]
+
+ /**
+ * Class to TypeTag
+ * @param clazz class
+ * @tparam T Type T
+ * @return
+ */
+ def typeTag[T](clazz:Class[T]):ru.TypeTag[T]={
+ if(clazz == null){
+ null
+ }else if(clazz == UNIT_CLASS) {
+ UNIT_TYPE_TAG.asInstanceOf[ru.TypeTag[T]]
+ } else {
+ val mirror = ru.runtimeMirror(clazz.getClassLoader)
+ val sym = mirror.staticClass(clazz.getCanonicalName)
+ val tpe = sym.selfType
+ ru.TypeTag(mirror, new api.TypeCreator {
+ def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) =
+ if (m eq mirror) tpe.asInstanceOf[U#Type]
+ else throw new IllegalArgumentException(s"Type tag defined in $mirror cannot be migrated to other mirrors.")
+ })
+ }
+ }
+
+ def javaTypeClass[T](obj: AnyRef, index: Int = 0):Class[T] = JavaReflections.getGenericTypeClass(obj,index).asInstanceOf[Class[T]]
+ def javaTypeTag[T](obj: AnyRef, index: Int = 0):ru.TypeTag[T] = typeTag(JavaReflections.getGenericTypeClass(obj,index)).asInstanceOf[ru.TypeTag[T]]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala
new file mode 100644
index 0000000..4ac0cdc
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/UnionUtils.scala
@@ -0,0 +1,43 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream.utils
+
+import java.util
+
+import org.apache.eagle.datastream.core.StreamProducer
+
+import scala.collection.JavaConverters._
+
+object UnionUtils {
+ def join[T1,T2](producers : StreamProducer[T1]*) : StreamProducer[T2] = {
+ producers.head.streamUnion(producers.drop(1))
+ }
+
+ def join[T1,T2](producers : java.util.List[StreamProducer[T1]]) : StreamProducer[T2] = {
+ val newList = new util.ArrayList(producers)
+ val head = newList.get(0)
+ newList.remove(0)
+ head.streamUnion(newList.asScala);
+ }
+
+ def join[T1,T2](producers : List[StreamProducer[T1]]) : StreamProducer[T2] = {
+ val head = producers.head
+ head.streamUnion(producers.tail);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java
new file mode 100644
index 0000000..c5f9045
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestExecutionEnvironmentJava.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
+import org.junit.Test;
+
+/**
+ * @since 12/5/15
+ */
+public class TestExecutionEnvironmentJava {
+
+ @Test
+ public void testGetEnvInJava() {
+ StormExecutionEnvironment env0 = ExecutionEnvironments.get(StormExecutionEnvironment.class);
+ Assert.assertNotNull(env0);
+
+ StormExecutionEnvironment env1 = ExecutionEnvironments.get(new String[]{}, StormExecutionEnvironment.class);
+ Assert.assertNotNull(env1);
+ Config config = ConfigFactory.load();
+ StormExecutionEnvironment env2 = ExecutionEnvironments.get(config, StormExecutionEnvironment.class);
+ Assert.assertNotNull(env2);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
index 07c19de..979b09d 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaMain.java
@@ -33,22 +33,22 @@ public class TestJavaMain {
}
//@Test
- public void testGeneral(){
- Config config = ConfigFactory.load();
- StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
- env.newSource(new TestKeyValueSpout()).renameOutputFields(2).groupBy(Arrays.asList(0)).flatMap(new GroupedEchoExecutor()).withParallelism(2);
- env.execute();
- }
+// public void testGeneral(){
+// Config config = ConfigFactory.load();
+// StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
+// env.fromSpout(new TestKeyValueSpout()).withOutputFields(2).groupBy(Arrays.asList(0)).flatMap(new GroupedEchoExecutor()).parallelism(2);
+// env.execute();
+// }
- //@Test
- public void testMap(){
- Config config = ConfigFactory.load();
- StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
- SerializableFunction1 f1 = new SerializableFunction1<Object, Object>();
- env.newSource(new TestKeyValueSpout()).renameOutputFields(2).
- map1(f1);
- env.execute();
- }
+// //@Test
+// public void testMap(){
+// Config config = ConfigFactory.load();
+// StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
+// SerializableFunction1 f1 = new SerializableFunction1<Object, Object>();
+// env.fromSpout(new TestKeyValueSpout()).withOutputFields(2).
+// map1(f1);
+// env.execute();
+// }
@Test
public void test() {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java
new file mode 100644
index 0000000..6d2d36f
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream;
+
+import junit.framework.Assert;
+import org.apache.eagle.datastream.utils.Reflections;
+import org.junit.Test;
+import scala.reflect.api.TypeTags;
+
+/**
+ * @since 12/8/15
+ */
+public class TestJavaReflectionUtils {
+ @Test
+ public void testJavaFlatMapper(){
+ Class<String> clazz = Reflections.javaTypeClass(new JavaEchoExecutor(), 0);
+ Assert.assertEquals(String.class,clazz);
+ TypeTags.TypeTag typeTag = Reflections.typeTag(clazz);
+ Assert.assertNotNull(typeTag);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
index 5651e74..2a5043c 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestKafkaStreamMonitor.java
@@ -16,8 +16,7 @@
*/
package org.apache.eagle.datastream;
-import org.apache.eagle.datastream.kafka.KafkaStreamMonitorApp;
-import org.apache.eagle.datastream.kafka.KafkaStreamMonitorApp;
+import org.apache.eagle.datastream.storm.KafkaStreamMonitorApp;
/**
* @since 11/7/15
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
index 5463079..4339e5a 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
@@ -19,46 +19,44 @@
package org.apache.eagle.datastream
-import java.util
-
-import com.typesafe.config.{ConfigFactory, Config}
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment
object testStreamUnionExpansion extends App{
- val config : Config = ConfigFactory.load;
+ val config : Config = ConfigFactory.load
val env = new StormExecutionEnvironment(config)
- val tail1 = env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
- val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a))
+ val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
+ val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a))
tail1.streamUnion(List(tail2)).map1(a => "xyz")
-// env.execute
}
object testStreamGroupbyExpansion extends App{
val config : Config = ConfigFactory.load;
val env = new StormExecutionEnvironment(config)
- env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).groupBy(1).map2(a => ("key1",a))
+ env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).groupBy(1).map2(a => ("key1",a))
//env.execute
}
object testStreamUnionAndGroupbyExpansion extends App{
val config : Config = ConfigFactory.load;
val env = new StormExecutionEnvironment(config)
- val tail1 = env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(1)
- val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(0)
+ val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(1)
+ val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a)).groupBy(0)
tail1.streamUnion(List(tail2)).map1(a => "xyz")
- //env.execute
+// env.execute()
}
/**
* 1. stream schema
* curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
* 2. policy
- * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\'')==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
+ * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
*/
object testAlertExpansion extends App{
val config : Config = ConfigFactory.load;
val env = new StormExecutionEnvironment(config)
- val tail1 = env.newSource(TestSpout()).withName("testSpout1")
- .flatMap(WordPrependForAlertExecutor("test")).withName("prepend")
+ val tail1 = env.fromSpout(TestSpout()).nameAs("testSpout1")
+ .flatMap(WordPrependForAlertExecutor("test")).nameAs("prepend")
.alertWithConsumer("s1", "alert1")
//env.execute
}
@@ -68,14 +66,14 @@ object testAlertExpansion extends App{
* curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
* curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s2","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
* 2. policy
- * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\'')==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
+ * curl -X POST -H 'Content-Type:application/json' "http://localhost:38080/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\)==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
*/
object testAlertExpansionWithUnion extends App{
val config : Config = ConfigFactory.load;
val env = new StormExecutionEnvironment(config)
- val tail1 = env.newSource(TestSpout()).withName("testSpout1").flatMap(WordPrependForAlertExecutor("test")).withName("prepend") //.map2(a => ("key1",a))
- val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")) //.map2(a => ("key1",a))
- tail1.streamUnion(List(tail2)).alert(util.Arrays.asList("s1","s2"), "alert1", true)
+ val tail1 = env.fromSpout(TestSpout()).nameAs("testSpout1").flatMap(WordPrependForAlertExecutor("test")).nameAs("prepend") //.map2(a => ("key1",a))
+ val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")) //.map2(a => ("key1",a))
+ tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = true)
//env.execute
}
@@ -83,7 +81,7 @@ object testAlertExpansionWithUnion extends App{
object testStreamUnionExpansionWithSharedSpout extends App{
val config : Config = ConfigFactory.load;
val env = new StormExecutionEnvironment(config)
- val source = env.newSource(TestSpout())
+ val source = env.fromSpout(TestSpout())
val tail1 = source.flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
val tail2 = source.flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key1",a))
tail1.streamUnion(List(tail2)).map1(a => {
@@ -96,11 +94,11 @@ object testStreamUnionExpansionWithSharedSpout extends App{
object testStreamUnionExpansionWithSharedSpout_2 extends App{
val config : Config = ConfigFactory.load;
val env = new StormExecutionEnvironment(config)
- val source = env.newSource(TestSpout())
+ val source = env.fromSpout(TestSpout())
val tail1 = source.flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
source.streamUnion(List(tail1)).map1(a => {
println(a)
"xyz"
})
// env.execute
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
new file mode 100644
index 0000000..b43f42e
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import com.typesafe.config.ConfigFactory
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment
+
+/**
+ * @since 12/5/15
+ */
+object TestExecutionEnvironment extends App{
+ val env0 = ExecutionEnvironments.get[StormExecutionEnvironment]
+ println(env0)
+ val config = ConfigFactory.load()
+ val env1 = ExecutionEnvironments.get[StormExecutionEnvironment](config)
+ println(env1)
+ val env2 = ExecutionEnvironments.get[StormExecutionEnvironment](Array[String]("-D","key=value"))
+ println(env2)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
index c550182..27fdd88 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestStormRunner.scala
@@ -16,9 +16,7 @@
*/
package org.apache.eagle.datastream
-import java.util
-
-import com.typesafe.config.{ConfigFactory, Config}
+import com.typesafe.config.{Config, ConfigFactory}
/**
* explicit union
@@ -27,14 +25,14 @@ import com.typesafe.config.{ConfigFactory, Config}
* 1. stream schema
* curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" -d '[{"prefix":"alertStreamSchema","tags":{"dataSource":"ds1","streamName":"s1","attrName":"word"},"attrDescription":"word","attrType":"string","category":"","attrValueResolver":""}]'
* 2. policy
- * curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\'')==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
+ * curl -X POST -H 'Content-Type:application/json' "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" -d '[{"tags":{"site":"sandbox","dataSource":"ds1","alertExecutorId":"alert1","policyId":"testAlert","policyType":"siddhiCEPEngine"},"desc":"test alert","policyDef":"{\"type\":\"siddhiCEPEngine\",\"expression\":\"from s1 [(str:regexp(word,'\'.*test.*\)==true)] select * insert into outputStream ;\"}","dedupeDef":"","notificationDef":"","remediationDef":"","enabled":"true"}]'
*/
object UnionForAlert extends App{
val config : Config = ConfigFactory.load;
- val env = ExecutionEnvironmentFactory.getStorm(config)
- val tail1 = env.newSource(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
- val tail2 = env.newSource(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key2",a))
- tail1.streamUnion(List(tail2)).alert(util.Arrays.asList("s1","s2"), "alert1", false)
+ val env = ExecutionEnvironments.getStorm(config)
+ val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
+ val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key2",a))
+ tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = false)
//env.execute
}
@@ -43,10 +41,10 @@ object UnionForAlert extends App{
*/
object TestAlertAfterFlatMap extends App{
val config : Config = ConfigFactory.load;
- val env = ExecutionEnvironmentFactory.getStorm(config)
- val tail1 = env.newSource(TestSpout())
+ val env = ExecutionEnvironments.getStorm(config)
+ val tail1 = env.fromSpout(TestSpout())
.flatMap(WordPrependForAlertExecutor("test"))
- .alert(util.Arrays.asList("s1"), "alert1", false)
+ .alert(Seq("s1"), "alert1", consume = false)
//env.execute
}
@@ -55,44 +53,44 @@ object TestAlertAfterFlatMap extends App{
*/
object TestAlertAfterMap extends App{
val config : Config = ConfigFactory.load;
- val env = ExecutionEnvironmentFactory.getStorm(config)
- val tail1 = env.newSource(TestSpout())
+ val env = ExecutionEnvironments.getStorm(config)
+ val tail1 = env.fromSpout(TestSpout())
.flatMap(WordPrependForAlertExecutor2("test"))
.map2(a => ("key", a))
- .alert(util.Arrays.asList("s1"), "alert1", false)
+ .alert(Seq("s1"), "alert1", false)
//env.execute
}
object StormRunnerWithoutSplitOrJoin extends Application{
val config : Config = ConfigFactory.load;
- val env = ExecutionEnvironmentFactory.getStorm(config)
- env.newSource(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test"))
+ val env = ExecutionEnvironments.getStorm(config)
+ env.fromSpout(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test"))
.flatMap(PatternAlertExecutor("test.*"))
- //env.execute
+// env.execute()
}
object StormRunnerWithSplit extends Application{
val config : Config = ConfigFactory.load;
- val env = ExecutionEnvironmentFactory.getStorm(config)
- val toBeSplit = env.newSource(TestSpout()).flatMap(EchoExecutor())
+ val env = ExecutionEnvironments.getStorm(config)
+ val toBeSplit = env.fromSpout(TestSpout()).flatMap(EchoExecutor())
toBeSplit.flatMap(WordPrependExecutor("test")).flatMap(PatternAlertExecutor("test.*"))
toBeSplit.flatMap(WordAppendExecutor("test"))
- //env.execute
+// env.execute()
}
object StormRunnerWithUnion extends Application{
val config : Config = ConfigFactory.load;
- val env = ExecutionEnvironmentFactory.getStorm(config)
- val tail1 = env.newSource(TestSpout()).flatMap(WordPrependExecutor("test"))
- val tail2 = env.newSource(TestSpout()).flatMap(WordAppendExecutor("test"))
+ val env = ExecutionEnvironments.getStorm(config)
+ val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependExecutor("test"))
+ val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendExecutor("test"))
tail1.streamUnion(List(tail2)).flatMap(PatternAlertExecutor(".*test.*"))
- //env.execute
+ env.execute()
}
object StormRunnerWithFilter extends Application{
val config : Config = ConfigFactory.load;
- val env = ExecutionEnvironmentFactory.getStorm(config)
- env.newSource(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test")).
+ val env = ExecutionEnvironments.getStorm(config)
+ env.fromSpout(TestSpout()).flatMap(EchoExecutor()).flatMap(WordPrependExecutor("test")).
filter(_=>false).
flatMap(PatternAlertExecutor("test.*"))
//env.execute
@@ -100,8 +98,8 @@ object StormRunnerWithFilter extends Application{
object StormRunnerWithJavaExecutor extends Application{
val config : Config = ConfigFactory.load;
- val env = ExecutionEnvironmentFactory.getStorm(config)
- env.newSource(TestSpout()).flatMap(new JavaEchoExecutor()).flatMap(WordPrependExecutor("test")).
+ val env = ExecutionEnvironments.getStorm(config)
+ env.fromSpout(TestSpout()).flatMap(new JavaEchoExecutor()).flatMap(WordPrependExecutor("test")).
filter(_=>false).
flatMap(PatternAlertExecutor("test.*"))
//env.execute
@@ -109,14 +107,14 @@ object StormRunnerWithJavaExecutor extends Application{
object StormRunnerWithKeyValueSpout extends Application{
val config : Config = ConfigFactory.load;
- val env = ExecutionEnvironmentFactory.getStorm(config)
- env.newSource(TestKeyValueSpout()).groupBy(1).flatMap(new GroupedEchoExecutor()).withParallelism(2)
+ val env = ExecutionEnvironments.getStorm(config)
+ env.fromSpout(TestKeyValueSpout()).groupBy(1).flatMap(new GroupedEchoExecutor()).parallelism(2)
//env.execute
}
object StormRunnerWithKeyValueSpoutRenameOutputFields extends Application{
val config : Config = ConfigFactory.load;
- val env = ExecutionEnvironmentFactory.getStorm(config)
- env.newSource(TestKeyValueSpout()).renameOutputFields(2).groupBy(0).flatMap(new GroupedEchoExecutor()).withParallelism(2)
+ val env = ExecutionEnvironments.getStorm(config)
+ env.fromSpout(TestKeyValueSpout()).withOutputFields(2).groupBy(0).flatMap(new GroupedEchoExecutor()).parallelism(2)
//env.execute
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
new file mode 100644
index 0000000..cf95304
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.datastream
+
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment
+
+/**
+ * @since 12/4/15
+ */
+case class Entity(name:String,value:Double,var inc:Int=0)
+
+object TestIterableWithGroupBy extends App {
+
+ val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
+
+ val tuples = Seq(
+ Entity("a", 1),
+ Entity("a", 2),
+ Entity("a", 3),
+ Entity("b", 2),
+ Entity("c", 3),
+ Entity("d", 3)
+ )
+
+ env.from(tuples)
+ .groupByKey(_.name)
+ .map(o => {o.inc += 2;o})
+ .filter(_.name != "b")
+ .filter(_.name != "c")
+ .groupByKey(o=>(o.name,o.value))
+ .map(o => (o.name,o))
+ .map(o => (o._1,o._2.value,o._2.inc))
+ .foreach(println)
+
+ env.execute()
+}
+
+object TestIterableWithGroupByCircularly extends App{
+ val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
+ val tuples = Seq(
+ Entity("a", 1),
+ Entity("a", 2),
+ Entity("a", 3),
+ Entity("b", 2),
+ Entity("c", 3),
+ Entity("d", 3)
+ )
+ env.from(tuples,recycle = true)
+ .map(o => {o.inc += 2;o})
+ .groupByKey(_.name)
+ .foreach(println)
+ env.execute()
+}
+
+object TestGroupByKeyOnSpoutproxy extends App{
+ val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
+
+ val tuples = Seq(
+ Entity("a", 1),
+ Entity("a", 2),
+ Entity("a", 3),
+ Entity("b", 2),
+ Entity("c", 3),
+ Entity("d", 3)
+ )
+
+ env.fromSpout[String](TestSpout())
+ .groupByKey(_.charAt(0))
+ .foreach(println)
+ env.execute()
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java
new file mode 100644
index 0000000..7e66478
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaMapper.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream;
+
+import java.util.List;
+
+public interface JavaMapper {
+ List<Object> map(List<Object> input);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java
new file mode 100644
index 0000000..39ce6c2
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/datastream/JavaTypeCompatible.java
@@ -0,0 +1,24 @@
+package org.apache.eagle.datastream;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @since 12/8/15
+ */
+public interface JavaTypeCompatible {
+ Class<?> getType();
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
index 5c89a41..63aa4fb 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/EagleTuple.scala
@@ -21,26 +21,18 @@ trait EagleTuple extends Serializable{
}
case class Tuple1[T0](f0 : T0) extends EagleTuple{
- override def getList : List[AnyRef] = {
- return List(f0.asInstanceOf[AnyRef])
- }
+ override def getList : List[AnyRef] = List(f0.asInstanceOf[AnyRef])
+
}
case class Tuple2[T0, T1](f0 : T0, f1: T1) extends EagleTuple{
- override def getList : List[AnyRef] = {
- return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef])
- }
+ override def getList : List[AnyRef] = List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef])
}
case class Tuple3[T0, T1, T2](f0 : T0, f1: T1, f2: T2) extends EagleTuple{
- override def getList : List[AnyRef] = {
- return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef])
- }
+ override def getList : List[AnyRef] = List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef])
}
case class Tuple4[T0, T1, T2, T3](f0 : T0, f1: T1, f2: T2, f3 : T3) extends EagleTuple{
- override def getList : List[AnyRef] = {
- return List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef], f3.asInstanceOf[AnyRef])
- }
-}
-
+ override def getList : List[AnyRef] = List(f0.asInstanceOf[AnyRef], f1.asInstanceOf[AnyRef], f2.asInstanceOf[AnyRef], f3.asInstanceOf[AnyRef])
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
index 22e063d..8569ae5 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
@@ -16,6 +16,6 @@
*/
package org.apache.eagle.datastream
-trait FlatMapper[T, R] extends Serializable{
- def flatMap(input : T, collector : Collector[R])
+trait FlatMapper[T] extends Serializable {
+ def flatMap(input : Seq[AnyRef], collector : Collector[T])
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
index b3d77e9..22ae213 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/StormStreamExecutor.scala
@@ -17,18 +17,23 @@
package org.apache.eagle.datastream
import com.typesafe.config.Config
+import scala.collection.JavaConverters._
-trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef], R] {
+trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[R] {
def prepareConfig(config : Config)
def init
def fields : Array[String]
}
-trait JavaStormStreamExecutor[R <: EagleTuple] extends FlatMapper[java.util.List[AnyRef], R] {
+trait JavaStormStreamExecutor[R <: EagleTuple] extends FlatMapper[R] {
def prepareConfig(config : Config)
def init
def fields : Array[String]
override def toString() = this.getClass.getSimpleName
+
+ override def flatMap(input : Seq[AnyRef], collector : Collector[R]) = flatMap(input.asJava,collector)
+
+ def flatMap(input : java.util.List[AnyRef], collector : Collector[R])
}
abstract class StormStreamExecutor1[T0] extends StormStreamExecutor[Tuple1[T0]] {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
index d9363e8..2b2b1ac 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/EagleServiceConnector.java
@@ -20,10 +20,12 @@ package org.apache.eagle.service.client;
import com.typesafe.config.Config;
import org.apache.eagle.common.config.EagleConfigConstants;
+import java.io.Serializable;
+
/**
* Some common codes to enable DAO through eagle service including service host/post, credential population etc.
*/
-public class EagleServiceConnector {
+public class EagleServiceConnector implements Serializable{
private final String eagleServiceHost;
private final Integer eagleServicePort;
private String username;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
index 218b812..4eeda05 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
@@ -19,12 +19,10 @@ package org.apache.eagle.metric.kafka;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.base.BaseRichSpout;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
-import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
-import org.apache.eagle.datastream.StormExecutionEnvironment;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.BrokerHosts;
@@ -42,13 +40,8 @@ public class EagleMetricCollectorMain {
private static final Logger LOG = LoggerFactory.getLogger(EagleMetricCollectorMain.class);
public static void main(String[] args) throws Exception {
- new ConfigOptionParser().load(args);
- //System.setProperty("config.resource", "/application.local.conf");
-
- Config config = ConfigFactory.load();
-
- StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
-
+ StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
+ Config config = env.getConfig();
String deserClsName = config.getString("dataSourceConfig.deserializerClass");
final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
@Override
@@ -60,6 +53,8 @@ public class EagleMetricCollectorMain {
}
};
+
+ // TODO: Refactored the anonymous in to independen class file, avoiding too complex logic in main method
KafkaSourcedSpoutProvider kafkaMessageSpoutProvider = new KafkaSourcedSpoutProvider() {
@Override
public BaseRichSpout getSpout(Config context) {
@@ -119,9 +114,9 @@ public class EagleMetricCollectorMain {
}
};
- env.newSource(new KafkaOffsetSourceSpoutProvider().getSpout(config)).renameOutputFields(0).withName("kafkaLogLagChecker");
- env.newSource(kafkaMessageSpoutProvider.getSpout(config)).renameOutputFields(2).withName("kafkaMessageFetcher").groupBy(Arrays.asList(0))
+ env.fromSpout(new KafkaOffsetSourceSpoutProvider()).withOutputFields(0).nameAs("kafkaLogLagChecker");
+ env.fromSpout(kafkaMessageSpoutProvider).withOutputFields(2).nameAs("kafkaMessageFetcher").groupBy(Arrays.asList(0))
.flatMap(new KafkaMessageDistributionExecutor());
env.execute();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
index 5fd02fd..8f25564 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
@@ -19,14 +19,11 @@ package org.apache.eagle.metric.kafka;
import backtype.storm.topology.base.BaseRichSpout;
import com.typesafe.config.Config;
import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
import org.apache.eagle.service.client.ServiceConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class KafkaOffsetSourceSpoutProvider {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetSourceSpoutProvider.class);
-
+public class KafkaOffsetSourceSpoutProvider implements StormSpoutProvider {
public BaseRichSpout getSpout(Config config){
ZKStateConfig zkStateConfig = new ZKStateConfig();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileSensitivityAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileSensitivityAPIEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileSensitivityAPIEntity.java
index e4e63c9..00b6ca7 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileSensitivityAPIEntity.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileSensitivityAPIEntity.java
@@ -49,4 +49,4 @@ public class FileSensitivityAPIEntity extends TaggedLogAPIEntity{
this.sensitivityType = sensitivityType;
valueChanged("sensitivityType");
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
index e808502..7d32091 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
@@ -20,7 +20,6 @@
package org.apache.eagle.security.partition;
import com.sun.jersey.api.client.WebResource;
-import org.apache.commons.lang.time.DateUtils;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.metric.MetricConstants;
import org.apache.eagle.partition.DataDistributionDao;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogProcessorMain.java b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogProcessorMain.java
index 1a90cbd..b00f39c 100644
--- a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogProcessorMain.java
+++ b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogProcessorMain.java
@@ -17,32 +17,17 @@
*/
package org.apache.eagle.security.hbase;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigRenderOptions;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
-import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
-import org.apache.eagle.datastream.StormExecutionEnvironment;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
import org.apache.eagle.security.hbase.sensitivity.HbaseResourceSensitivityDataJoinExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class HbaseAuditLogProcessorMain {
- private static final Logger LOG = LoggerFactory.getLogger(HbaseAuditLogProcessorMain.class);
-
public static void main(String[] args) throws Exception{
- Config config = new ConfigOptionParser().load(args);
-
- LOG.info("Config class: " + config.getClass().getCanonicalName());
-
- if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
-
- StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
- env.newSource(new KafkaSourcedSpoutProvider().getSpout(config)).renameOutputFields(1).withName("kafkaMsgConsumer")
+ StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
+ env.fromSpout(new KafkaSourcedSpoutProvider()).withOutputFields(1).nameAs("kafkaMsgConsumer")
.flatMap(new HbaseResourceSensitivityDataJoinExecutor())
.alertWithConsumer("hbaseSecurityLogEventStream", "hbaseSecurityLogAlertExecutor");
env.execute();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
index 2e8b852..5bb2aff 100644
--- a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
+++ b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
@@ -55,7 +55,6 @@ public class HbaseResourceSensitivityDataJoinExecutor extends JavaStormStreamExe
}
}
-
@Override
public void flatMap(List<Object> input, Collector<Tuple2<String, Map>> outputCollector){
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/run_auditlog_topology.sh
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/run_auditlog_topology.sh b/eagle-security/eagle-security-hdfs-auditlog/run_auditlog_topology.sh
index 8e8d969..38d8ce9 100755
--- a/eagle-security/eagle-security-hdfs-auditlog/run_auditlog_topology.sh
+++ b/eagle-security/eagle-security-hdfs-auditlog/run_auditlog_topology.sh
@@ -1,2 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
export JAVA_HOME=$(/usr/libexec/java_home -v 1.7)
mvn -X exec:java -Dexec.mainClass="eagle.security.auditlog.HdfsAuditLogProcessorMain"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/run_hostname_lookkup.sh
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/run_hostname_lookkup.sh b/eagle-security/eagle-security-hdfs-auditlog/run_hostname_lookkup.sh
index ba761de..fffd8c0 100755
--- a/eagle-security/eagle-security-hdfs-auditlog/run_hostname_lookkup.sh
+++ b/eagle-security/eagle-security-hdfs-auditlog/run_hostname_lookkup.sh
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
### refer to http://mojo.codehaus.org/exec-maven-plugin/usage.html
### java goal is to execute program within maven JVM
export JAVA_HOME=$(/usr/libexec/java_home -v 1.7)
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/run_message_producer.sh
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/run_message_producer.sh b/eagle-security/eagle-security-hdfs-auditlog/run_message_producer.sh
index 9ecaa26..dbe50f3 100755
--- a/eagle-security/eagle-security-hdfs-auditlog/run_message_producer.sh
+++ b/eagle-security/eagle-security-hdfs-auditlog/run_message_producer.sh
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
export JAVA_HOME=$(/usr/libexec/java_home -v 1.7)
### mvn -X exec:java -Dexec.args="-input /Users/user1/Downloads/hdfs-audit.log.2015-01-22-22 -maxNum 10000000 -topic hdfs_audit_log" -Pproducer
mvn exec:java -Dexec.mainClass="eagle.security.auditlog.util.AuditLogKafkaProducer" -Dexec.args="-input /Users/user1/Downloads/hdfs-audit.log.2015-01-22-22 -maxNum 20000 -topic hdfs_audit_log"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/run_message_producer_in_assembly.sh
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/run_message_producer_in_assembly.sh b/eagle-security/eagle-security-hdfs-auditlog/run_message_producer_in_assembly.sh
index 20d7cae..128926d 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/run_message_producer_in_assembly.sh
+++ b/eagle-security/eagle-security-hdfs-auditlog/run_message_producer_in_assembly.sh
@@ -1 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
java -cp target/eagle-app-security-activity-monitoring-dataproc-1.0.0-jar-with-dependencies.jar:/Users/user1/.m2/repository/org/slf4j/slf4j-api/1.4.3/slf4j-api-1.4.3.jar:/Users/user1/.m2/repository/org/slf4j/slf4j-log4j12/1.4.3/slf4j-log4j12-1.4.3.jar eagle.app.security.dataproc.util.AuditLogKafkaProducer -input /Users/user1/projects/eagle-app-security-activity-monitoring/eagle-app-security-activity-monitoring-dataproc/src/main/resources/auditlog/auditlog.1 --topic hdfs_audit_log
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/src/assembly/eagle-dam-auditlog-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/assembly/eagle-dam-auditlog-assembly.xml b/eagle-security/eagle-security-hdfs-auditlog/src/assembly/eagle-dam-auditlog-assembly.xml
index f3b17d4..0acf619 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/assembly/eagle-dam-auditlog-assembly.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/assembly/eagle-dam-auditlog-assembly.xml
@@ -1,3 +1,20 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
index eb84fd3..f63a9be 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
@@ -20,32 +20,27 @@ package org.apache.eagle.security.auditlog;
import backtype.storm.spout.SchemeAsMultiScheme;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
-import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
-import org.apache.eagle.datastream.StormExecutionEnvironment;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.core.StreamProducer;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
import org.apache.eagle.partition.DataDistributionDao;
import org.apache.eagle.partition.PartitionAlgorithm;
import org.apache.eagle.partition.PartitionStrategy;
import org.apache.eagle.partition.PartitionStrategyImpl;
import org.apache.eagle.security.partition.DataDistributionDaoImpl;
import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
-import org.apache.eagle.datastream.StreamProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class HdfsAuditLogProcessorMain {
- private static final Logger LOG = LoggerFactory.getLogger(HdfsAuditLogProcessorMain.class);
-
public static PartitionStrategy createStrategy(Config config) {
+ // TODO: Refactor configuration structure to avoid repeated config processing configure ~ hao
String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
@@ -72,6 +67,7 @@ public class HdfsAuditLogProcessorMain {
return Arrays.asList(map.get("user"), tmp);
}
};
+
KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
@Override
public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
@@ -81,8 +77,9 @@ public class HdfsAuditLogProcessorMain {
return provider;
}
- public static void execWithDefaultPartition(Config config, StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
- StreamProducer source = env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").groupBy(Arrays.asList(0));
+ @SuppressWarnings("unchecked")
+ public static void execWithDefaultPartition(StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
+ StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer").groupBy(Arrays.asList(0));
StreamProducer reassembler = source.flatMap(new HdfsUserCommandReassembler()).groupBy(Arrays.asList(0));
source.streamUnion(reassembler)
.flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
@@ -91,9 +88,10 @@ public class HdfsAuditLogProcessorMain {
env.execute();
}
- public static void execWithBalancedPartition(Config config, StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
- PartitionStrategy strategy = createStrategy(config);
- StreamProducer source = env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").customGroupBy(strategy);
+ @SuppressWarnings("unchecked")
+ public static void execWithBalancedPartition(StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
+ PartitionStrategy strategy = createStrategy(env.getConfig());
+ StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer").groupBy(strategy);
StreamProducer reassembler = source.flatMap(new HdfsUserCommandReassembler()).groupBy(Arrays.asList(0));
source.streamUnion(reassembler)
.flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
@@ -103,18 +101,14 @@ public class HdfsAuditLogProcessorMain {
}
public static void main(String[] args) throws Exception{
- Config config = new ConfigOptionParser().load(args);
- LOG.info("Config class: " + config.getClass().getCanonicalName());
- if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
-
- StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
- KafkaSourcedSpoutProvider provider = createProvider(config);
- Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") ? config.getBoolean("eagleProps.balancePartitionEnabled") : false;
+ StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
+ Config config = env.getConfig();
+ KafkaSourcedSpoutProvider provider = createProvider(env.getConfig());
+ Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") && config.getBoolean("eagleProps.balancePartitionEnabled");
if (balancePartition) {
- execWithBalancedPartition(config, env, provider);
- }
- else {
- execWithDefaultPartition(config, env, provider);
+ execWithBalancedPartition(env, provider);
+ } else {
+ execWithDefaultPartition(env, provider);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
index c6894dc..c2d50db 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
@@ -36,9 +36,6 @@ import org.wso2.siddhi.core.stream.input.InputHandler;
import java.util.*;
-/**
- * Created by yonzhang on 11/20/15.
- */
public class HdfsUserCommandReassembler extends JavaStormStreamExecutor2<String, Map> {
private static final Logger LOG = LoggerFactory.getLogger(HdfsUserCommandReassembler.class);
private Config config;
@@ -142,14 +139,14 @@ public class HdfsUserCommandReassembler extends JavaStormStreamExecutor2<String,
@Override
public void flatMap(List<Object> input, Collector<Tuple2<String, Map>> collector) {
- LOG.debug("incoming event:" + input.get(1));
- SortedMap<String, Object> toBeCopied = (SortedMap<String, Object>)input.get(1);
- SortedMap<String, Object> event = new TreeMap<String, Object>(toBeCopied);
+ if(LOG.isDebugEnabled()) LOG.debug("incoming event:" + input.get(1));
+ SortedMap<String, Object> toBeCopied = (SortedMap<String, Object>) input.get(1);
+ SortedMap<String, Object> event = new TreeMap<>(toBeCopied);
Object[] siddhiEvent = convertToSiddhiEvent(collector, event);
try {
inputHandler.send(siddhiEvent);
- }catch(Exception ex){
- LOG.error("fail sending event to Siddhi pattern engine", ex);
+ } catch (Exception ex){
+ LOG.error("Fail sending event to Siddhi pattern engine", ex);
throw new IllegalStateException(ex);
}
}
@@ -163,4 +160,4 @@ public class HdfsUserCommandReassembler extends JavaStormStreamExecutor2<String,
}
return siddhiEvent;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
index 1d13082..5a82fda 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
@@ -55,7 +55,7 @@
#"kafkaStatisticRangeInMin" : 60,
"eagleService": {
"host": "localhost",
- "port": 9099,
+ "port": 38080,
"username": "admin",
"password": "secret"
},
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/auditlog/auditlog.1
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/auditlog/auditlog.1 b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/auditlog/auditlog.1
deleted file mode 100644
index 7d470f4..0000000
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/auditlog/auditlog.1
+++ /dev/null
@@ -1,16 +0,0 @@
-2015-09-21 21:36:52,172 INFO FSNamesystem.audit: allowed=true ugi=ambari-qa (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp/hive dst=null perm=null proto=rpc
-2015-09-21 21:36:52,268 INFO FSNamesystem.audit: allowed=true ugi=ambari-qa (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp/hive dst=null perm=null proto=rpc
-2015-09-21 21:36:52,274 INFO FSNamesystem.audit: allowed=true ugi=ambari-qa (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp/hive/ambari-qa dst=null perm=null proto=rpc
-2015-09-21 21:36:52,430 INFO FSNamesystem.audit: allowed=true ugi=ambari-qa (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929 dst=null perm=null proto=rpc
-2015-09-21 21:36:52,437 INFO FSNamesystem.audit: allowed=true ugi=ambari-qa (auth:SIMPLE) ip=/10.0.2.15 cmd=mkdirs src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929 dst=null perm=ambari-qa:hdfs:rwx------ proto=rpc
-2015-09-21 21:36:52,443 INFO FSNamesystem.audit: allowed=true ugi=ambari-qa (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929 dst=null perm=null proto=rpc
-2015-09-21 21:36:52,448 INFO FSNamesystem.audit: allowed=true ugi=ambari-qa (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929/_tmp_space.db dst=null perm=null proto=rpc
-2015-09-21 21:36:52,451 INFO FSNamesystem.audit: allowed=true ugi=ambari-qa (auth:SIMPLE) ip=/10.0.2.15 cmd=mkdirs src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929/_tmp_space.db dst=null perm=ambari-qa:hdfs:rwx------ proto=rpc
-2015-09-21 21:36:52,453 INFO FSNamesystem.audit: allowed=true ugi=ambari-qa (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929/_tmp_space.db dst=null perm=null proto=rpc
-2015-09-21 21:36:55,455 INFO FSNamesystem.audit: allowed=true ugi=ambari-qa (auth:SIMPLE) ip=/10.0.2.15 cmd=delete src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929 dst=null perm=null proto=rpc
-2015-09-21 21:36:55,477 INFO FSNamesystem.audit: allowed=true ugi=ambari-qa (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929 dst=null perm=null proto=rpc
-2015-09-21 21:36:55,480 INFO FSNamesystem.audit: allowed=true ugi=ambari-qa (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp/hive/ambari-qa/e7b21624-afca-473d-99ef-847614a46929/_tmp_space.db dst=null perm=null proto=rpc
-2015-09-21 21:37:20,809 INFO FSNamesystem.audit: allowed=true ugi=oozie (auth:SIMPLE) ip=/10.0.2.15 cmd=listStatus src=/user/oozie/share/lib dst=null perm=null proto=rpc
-2015-09-21 21:37:24,330 INFO FSNamesystem.audit: allowed=false ugi=hive (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/ranger/audit/hiveServer2/20150921/sandbox.hortonworks.com-audit.log dst=null perm=null proto=rpc
-2015-09-21 21:37:24,334 INFO FSNamesystem.audit: allowed=false ugi=hive (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/ranger/audit/hiveServer2/20150921/sandbox.hortonworks.com-audit-1.log dst=null perm=null proto=rpc
-2015-09-21 21:37:24,338 INFO FSNamesystem.audit: allowed=false ugi=hive (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/ranger/audit/hiveServer2/20150921 dst=null perm=null proto=rpc
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml
index 16368cd..a68a323 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
topology.workers: 1
topology.acker.executors: 1
topology.tasks: 1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-securitylog/src/main/java/org/apache/eagle/security/securitylog/HDFSSecurityLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-securitylog/src/main/java/org/apache/eagle/security/securitylog/HDFSSecurityLogProcessorMain.java b/eagle-security/eagle-security-hdfs-securitylog/src/main/java/org/apache/eagle/security/securitylog/HDFSSecurityLogProcessorMain.java
index 9238cad..07bdd3b 100644
--- a/eagle-security/eagle-security-hdfs-securitylog/src/main/java/org/apache/eagle/security/securitylog/HDFSSecurityLogProcessorMain.java
+++ b/eagle-security/eagle-security-hdfs-securitylog/src/main/java/org/apache/eagle/security/securitylog/HDFSSecurityLogProcessorMain.java
@@ -17,32 +17,15 @@
*/
package org.apache.eagle.security.securitylog;
-
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigRenderOptions;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
-import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
-import org.apache.eagle.datastream.StormExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
public class HDFSSecurityLogProcessorMain {
- private static final Logger LOG = LoggerFactory.getLogger(HDFSSecurityLogProcessorMain.class);
-
public static void main(String[] args) throws Exception{
- Config config = new ConfigOptionParser().load(args);
-
- LOG.info("Config class: " + config.getClass().getCanonicalName());
-
- if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
-
- StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
-
- env.newSource(new KafkaSourcedSpoutProvider().getSpout(config)).renameOutputFields(1).withName("kafkaMsgConsumer")
+ StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
+ env.fromSpout(new KafkaSourcedSpoutProvider()).withOutputFields(1).nameAs("kafkaMsgConsumer")
.alertWithConsumer("hdfsSecurityLogEventStream", "hdfsSecurityLogAlertExecutor");
env.execute();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSFileSystem.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSFileSystem.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSFileSystem.java
index 9a17075..67e4092 100644
--- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSFileSystem.java
+++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSFileSystem.java
@@ -1,12 +1,13 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,