You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/08 00:46:15 UTC
[1/3] incubator-eagle git commit: HBase audit monitoring with new app
framework https://issues.apache.org/jira/browse/EAGLE-420 Author: Yong Zhang
Reviewer: Hao Chen
Repository: incubator-eagle
Updated Branches:
refs/heads/develop 1d842563a -> 660bfbd3f
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/metadata.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/metadata.xml b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/metadata.xml
new file mode 100644
index 0000000..665b5d7
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/metadata.xml
@@ -0,0 +1,243 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ /*
+ ~ * Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ * contributor license agreements. See the NOTICE file distributed with
+ ~ * this work for additional information regarding copyright ownership.
+ ~ * The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ * (the "License"); you may not use this file except in compliance with
+ ~ * the License. You may obtain a copy of the License at
+ ~ * <p/>
+ ~ * http://www.apache.org/licenses/LICENSE-2.0
+ ~ * <p/>
+ ~ * Unless required by applicable law or agreed to in writing, software
+ ~ * distributed under the License is distributed on an "AS IS" BASIS,
+ ~ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ * See the License for the specific language governing permissions and
+ ~ * limitations under the License.
+ ~ */
+ -->
+
+<application>
+ <type>HBaseAuditLogApplication</type>
+ <name>HBase Audit Log Monitoring Application</name>
+ <version>0.5.0-incubating</version>
+ <appClass>org.apache.eagle.security.hbase.HBaseAuditLogApplication</appClass>
+ <viewPath>/apps/example</viewPath>
+ <configuration>
+ <property>
+ <name>dataSourceConfig.topic</name>
+ <displayName>dataSourceConfig.topic</displayName>
+ <value>sandbox_hbase_audit_log</value>
+ <description>data source topic</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkConnection</name>
+ <displayName>dataSourceConfig.zkConnection</displayName>
+ <value>localhost</value>
+ <description>zk connection</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkConnectionTimeoutMS</name>
+ <displayName>dataSourceConfig.zkConnectionTimeoutMS</displayName>
+ <value>15000</value>
+ <description>zk connection timeout in milliseconds</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.fetchSize</name>
+ <displayName>dataSourceConfig.fetchSize</displayName>
+ <value>1048586</value>
+ <description>kafka fetch size</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionZKServers</name>
+ <displayName>dataSourceConfig.transactionZKServers</displayName>
+ <value>localhost</value>
+ <description>zookeeper server for offset transaction</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionZKPort</name>
+ <displayName>dataSourceConfig.transactionZKPort</displayName>
+ <value>2181</value>
+ <description>zookeeper server port for offset transaction</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionZKRoot</name>
+ <displayName>dataSourceConfig.transactionZKRoot</displayName>
+ <value>/consumers</value>
+ <description>offset transaction root</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.consumerGroupId</name>
+ <displayName>dataSourceConfig.consumerGroupId</displayName>
+ <value>eagle.hbaseaudit.consumer</value>
+ <description>kafka consumer group Id</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionStateUpdateMS</name>
+ <displayName>dataSourceConfig.transactionStateUpdateMS</displayName>
+ <value>2000</value>
+ <description>zk upate</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.schemeCls</name>
+ <displayName>dataSourceConfig.schemeCls</displayName>
+ <value>storm.kafka.StringScheme</value>
+ <description>scheme class</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionZKPort</name>
+ <displayName>dataSourceConfig.transactionZKPort</displayName>
+ <value>2181</value>
+ <description>zookeeper server port for offset transaction</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionZKPort</name>
+ <displayName>dataSourceConfig.transactionZKPort</displayName>
+ <value>2181</value>
+ <description>zookeeper server port for offset transaction</description>
+ </property>
+ <property>
+ <name>topology.numOfSpoutTasks</name>
+ <displayName>topology.numOfSpoutTasks</displayName>
+ <value>2</value>
+ <description>number of spout tasks</description>
+ </property>
+ <property>
+ <name>topology.numOfParserTasks</name>
+ <displayName>topology.numOfParserTasks</displayName>
+ <value>2</value>
+ <description>number of parser tasks</description>
+ </property>
+ <property>
+ <name>topology.numOfJoinTasks</name>
+ <displayName>topology.numOfJoinTasks</displayName>
+ <value>2</value>
+ <description>number of external join tasks</description>
+ </property>
+ <property>
+ <name>topology.numOfSinkTasks</name>
+ <displayName>topology.numOfSinkTasks</displayName>
+ <value>2</value>
+ <description>number of sink tasks</description>
+ </property>
+ <property>
+ <name>eagleProps.dataJoinPollIntervalSec</name>
+ <displayName>eagleProps.dataJoinPollIntervalSec</displayName>
+ <value>30</value>
+ <description>interval in seconds for polling</description>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.host</name>
+ <displayName>eagleProps.eagleService.host</displayName>
+ <value>localhost</value>
+ <description>eagle service host</description>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.port</name>
+ <displayName>eagleProps.eagleService.port</displayName>
+ <value>8080</value>
+ <description>eagle service port</description>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.username</name>
+ <displayName>eagleProps.eagleService.username</displayName>
+ <value>admin</value>
+ <description>eagle service username</description>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.port</name>
+ <displayName>eagleProps.eagleService.password</displayName>
+ <value>secret</value>
+ <description>eagle service password</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.topic</name>
+ <displayName>dataSinkConfig.topic</displayName>
+ <value>sandbox_hbase_audit_log_parsed</value>
+ <description>topic for kafka data sink</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.brokerList</name>
+ <displayName>dataSinkConfig.brokerList</displayName>
+ <value>sandbox.hortonworks.com:6667</value>
+ <description>kafka broker list</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.serializerClass</name>
+ <displayName>dataSinkConfig.serializerClass</displayName>
+ <value>kafka.serializer.StringEncoder</value>
+ <description>serializer class Kafka message value</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.keySerializerClass</name>
+ <displayName>dataSinkConfig.keySerializerClass</displayName>
+ <value>kafka.serializer.StringEncoder</value>
+ <description>serializer class Kafka message key</description>
+ </property>
+ <property>
+ <name>metadata.store</name>
+ <displayName>metadata.store</displayName>
+ <value>org.apache.eagle.security.service.InMemMetadataDaoImpl</value>
+ <description>implementation class for metadata store</description>
+ </property>
+ </configuration>
+ <streams>
+ <stream>
+ <streamId>hbase_audit_log_stream</streamId>
+ <description>HBase Audit Log Stream</description>
+ <validate>true</validate>
+ <timeseries>true</timeseries>
+ <columns>
+ <column>
+ <name>action</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>host</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>status</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>timestamp</name>
+ <type>long</type>
+ </column>
+ </columns>
+ </stream>
+ </streams>
+ <docs>
+ <install>
+# Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+# Step 2: Set up data collector to flow data into kafka topic in
+
+./bin/logstash -f log_collector.conf
+
+## `log_collector.conf` sample as following:
+
+input {
+
+}
+filter {
+
+}
+output{
+
+}
+
+# Step 3: start application
+
+# Step 4: monitor with featured portal or alert with policies
+ </install>
+ <uninstall>
+# Step 1: stop and uninstall application
+# Step 2: delete kafka topic named "${site}_example_source_topic"
+# Step 3: stop logstash
+ </uninstall>
+ </docs>
+</application>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..e96f225
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,35 @@
+#
+# /*
+# * Licensed to the Apache Software Foundation (ASF) under one or more
+# * contributor license agreements. See the NOTICE file distributed with
+# * this work for additional information regarding copyright ownership.
+# * The ASF licenses this file to You under the Apache License, Version 2.0
+# * (the "License"); you may not use this file except in compliance with
+# * the License. You may obtain a copy of the License at
+# * <p/>
+# * http://www.apache.org/licenses/LICENSE-2.0
+# * <p/>
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+#
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.security.hbase.HBaseAuditLogAppProvider
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf
index bd38c83..5990450 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf
@@ -14,8 +14,10 @@
# limitations under the License.
{
+ "appId" : "HBaseAuditLogApp",
+ "mode" : "LOCAL",
+ "siteId" : "testsite",
"topology" : {
- "localMode" : true,
"numOfTotalWorkers" : 2,
"numOfSpoutTasks" : 2,
"numOfParserTasks" : 2,
@@ -39,7 +41,7 @@
"dataJoinPollIntervalSec" : 30,
"eagleService": {
"host": "localhost",
- "port": 58080
+ "port": 9090
"username": "admin",
"password": "secret"
}
@@ -50,4 +52,4 @@
"serializerClass" : "kafka.serializer.StringEncoder",
"keySerializerClass" : "kafka.serializer.StringEncoder"
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf.bak
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf.bak b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf.bak
deleted file mode 100644
index 5c44574..0000000
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf.bak
+++ /dev/null
@@ -1,66 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
- "envContextConfig" : {
- "env" : "storm",
- "mode" : "local",
- "topologyName" : "sandbox-hbaseSecurityLog-topology",
- "stormConfigFile" : "security-auditlog-storm.yaml",
- "parallelismConfig" : {
- "kafkaMsgConsumer" : 1,
- "hbaseSecurityLogAlertExecutor*" : 1
- }
- },
- "dataSourceConfig": {
- "topic" : "sandbox_hbase_security_log",
- "zkConnection" : "sandbox.hortonworks.com:2181",
- "zkConnectionTimeoutMS" : 15000,
- "consumerGroupId" : "EagleConsumer",
- "fetchSize" : 1048586,
- "deserializerClass" : "org.apache.eagle.security.hbase.HbaseAuditLogKafkaDeserializer",
- "transactionZKServers" : "sandbox.hortonworks.com",
- "transactionZKPort" : 2181,
- "transactionZKRoot" : "/consumers",
- "consumerGroupId" : "eagle.hbasesecurity.consumer",
- "transactionStateUpdateMS" : 2000
- },
- "alertExecutorConfigs" : {
- "hbaseSecurityLogAlertExecutor" : {
- "parallelism" : 1,
- "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
- "needValidation" : "true"
- }
- },
- "eagleProps" : {
- "site" : "sandbox",
- "application": "hbaseSecurityLog",
- "dataJoinPollIntervalSec" : 30,
- "mailHost" : "mailHost.com",
- "mailSmtpPort":"25",
- "mailDebug" : "true",
- "eagleService": {
- "host": "localhost",
- "port": 9098
- "username": "admin",
- "password": "secret"
- }
- },
- "dynamicConfigSource" : {
- "enabled" : true,
- "initDelayMillis" : 0,
- "delayMillis" : 30000
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml
deleted file mode 100644
index 4b48c0a..0000000
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml
+++ /dev/null
@@ -1,91 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<application>
- <type>HBaseAuditLogApplication</type>
- <name>HBase Audit Log Monitoring Application</name>
- <version>0.5.0-incubating</version>
- <appClass>org.apache.eagle.security.hbase.HBaseAuditLogApplication</appClass>
- <viewPath>/apps/example</viewPath>
- <configuration>
- <property>
- <name>message</name>
- <displayName>Message</displayName>
- <value>Hello, example application!</value>
- <description>Just an sample configuration property</description>
- </property>
- </configuration>
- <streams>
- <stream>
- <streamId>hbase_audit_log_stream</streamId>
- <description>HBase Audit Log Stream</description>
- <validate>true</validate>
- <timeseries>true</timeseries>
- <columns>
- <column>
- <name>action</name>
- <type>string</type>
- </column>
- <column>
- <name>host</name>
- <type>string</type>
- </column>
- <column>
- <name>status</name>
- <type>string</type>
- </column>
- <column>
- <name>timestamp</name>
- <type>long</type>
- </column>
- </columns>
- </stream>
- </streams>
- <docs>
- <install>
-# Step 1: Create source kafka topic named "${site}_example_source_topic"
-
-./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
-
-# Step 2: Set up data collector to flow data into kafka topic in
-
-./bin/logstash -f log_collector.conf
-
-## `log_collector.conf` sample as following:
-
-input {
-
-}
-filter {
-
-}
-output{
-
-}
-
-# Step 3: start application
-
-# Step 4: monitor with featured portal or alert with policies
- </install>
- <uninstall>
-# Step 1: stop and uninstall application
-# Step 2: delete kafka topic named "${site}_example_source_topic"
-# Step 3: stop logstash
- </uninstall>
- </docs>
-</application>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/resources/scripts
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/scripts b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/scripts
new file mode 100644
index 0000000..7d04fdc
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/scripts
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+./kafka-topics.sh --topic sandbox_hbase_audit_log --zookeeper localhost:2181 --create --replication-factor 1 --partitions 2
+
+./kafka-topics.sh --topic sandbox_hbase_audit_log_parsed --zookeeper localhost:2181 --create --replication-factor 1 --partitions 2
+
+./kafka-console-producer.sh --topic sandbox_hbase_audit_log --broker-list sandbox.hortonworks.com:6667
+
+./kafka-console-consumer.sh --topic sandbox_hbase_audit_log_parsed --zookeeper sandbox.hortonworks.com:2181 --from-beginning
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/test/java/org/apache/eagle/security/hbase/TestHbaseAuditLogProcessTopology.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/test/java/org/apache/eagle/security/hbase/TestHbaseAuditLogProcessTopology.java b/eagle-security/eagle-security-hbase-auditlog/src/test/java/org/apache/eagle/security/hbase/TestHbaseAuditLogProcessTopology.java
deleted file mode 100644
index 6abc966..0000000
--- a/eagle-security/eagle-security-hbase-auditlog/src/test/java/org/apache/eagle/security/hbase/TestHbaseAuditLogProcessTopology.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.security.hbase;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import com.typesafe.config.ConfigSyntax;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.junit.Test;
-
-
-public class TestHbaseAuditLogProcessTopology {
- @Test
- public void test() throws Exception {
- //Config baseConfig = ConfigFactory.load("eagle-scheduler.conf");
- ConfigParseOptions options = ConfigParseOptions.defaults()
- .setSyntax(ConfigSyntax.PROPERTIES)
- .setAllowMissing(false);
- String topoConfigStr = "web.hbase.zookeeper.property.clientPort=2181\nweb.hbase.zookeeper.quorum=sandbox.hortonworks.com\n\napp.envContextConfig.env=storm\napp.envContextConfig.mode=local\napp.dataSourceConfig.topic=sandbox_hbase_security_log\napp.dataSourceConfig.zkConnection=sandbox.hortonworks.com:2181\napp.dataSourceConfig.zkConnectionTimeoutMS=15000\napp.dataSourceConfig.brokerZkPath=/brokers\napp.dataSourceConfig.fetchSize=1048586\napp.dataSourceConfig.transactionZKServers=sandbox.hortonworks.com\napp.dataSourceConfig.transactionZKPort=2181\napp.dataSourceConfig.transactionZKRoot=/consumers\napp.dataSourceConfig.consumerGroupId=eagle.hbasesecurity.consumer\napp.dataSourceConfig.transactionStateUpdateMS=2000\napp.dataSourceConfig.deserializerClass=org.apache.eagle.security.hbase.HbaseAuditLogKafkaDeserializer\napp.eagleProps.site=sandbox\napp.eagleProps.application=hbaseSecurityLog\napp.eagleProps.dataJoinPollIntervalSec=30\napp.eagleProps.mailHost=mailHost.com\napp.eag
leProps.mailSmtpPort=25\napp.eagleProps.mailDebug=true\napp.eagleProps.eagleService.host=localhost\napp.eagleProps.eagleService.port=9098\napp.eagleProps.eagleService.username=admin\napp.eagleProps.eagleService.password=secret";
-
- Config topoConfig = ConfigFactory.parseString(topoConfigStr, options);
- Config conf = topoConfig.getConfig(EagleConfigConstants.APP_CONFIG);
-
- HbaseAuditLogMonitoringMain topology = new HbaseAuditLogMonitoringMain();
- //topology.submit("", conf);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/SensitivityMetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/SensitivityMetadataResource.java b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/SensitivityMetadataResource.java
new file mode 100644
index 0000000..7b85c69
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/SensitivityMetadataResource.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.eagle.service.security.hbase;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
+import org.apache.eagle.security.service.HBaseSensitivityEntity;
+import org.apache.eagle.security.service.ISecurityMetadataDAO;
+import org.apache.eagle.security.service.MetadataDaoFactory;
+
+import javax.ws.rs.*;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Since 6/10/16.
+ */
+@Path("/metadata/sensitivity")
+@Singleton
+public class SensitivityMetadataResource {
+ private ApplicationEntityService entityService;
+ private Config config;
+ private ISecurityMetadataDAO dao;
+ @Inject
+ public SensitivityMetadataResource(ApplicationEntityService entityService, Config eagleServerConfig){
+ this.entityService = entityService;
+ this.config = eagleServerConfig;
+ String metadataStoreCls = eagleServerConfig.getString("metadata.store");
+ dao = MetadataDaoFactory.getMetadataDAO(metadataStoreCls);
+ }
+
+ @Path("/hbase")
+ @GET
+ @Produces("application/json")
+ public Collection<HBaseSensitivityEntity> getHBaseSensitivites(@QueryParam("site") String site){
+ return dao.listHBaseSensitivies();
+ }
+
+ @Path("/hbase")
+ @POST
+ @Consumes("application/json")
+ public void addHBaseSensitivities(Collection<HBaseSensitivityEntity> list){
+ dao.addHBaseSensitivity(list);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-server/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-server/pom.xml b/eagle-server/pom.xml
index 6617693..6083616 100644
--- a/eagle-server/pom.xml
+++ b/eagle-server/pom.xml
@@ -58,6 +58,10 @@
<groupId>com.sun.jersey</groupId>
<artifactId>*</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -134,6 +138,16 @@
<artifactId>eagle-jpm-app</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-security-hbase-auditlog</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-security-hbase-web</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
<resources>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-server/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/resources/application.conf b/eagle-server/src/main/resources/application.conf
index 19b87b2..ce75909 100644
--- a/eagle-server/src/main/resources/application.conf
+++ b/eagle-server/src/main/resources/application.conf
@@ -51,4 +51,4 @@
"nimbusThriftPort": 6627
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-server/src/main/resources/configuration.yml
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/resources/configuration.yml b/eagle-server/src/main/resources/configuration.yml
new file mode 100644
index 0000000..c671ade
--- /dev/null
+++ b/eagle-server/src/main/resources/configuration.yml
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+server:
+ applicationConnectors:
+ - type: http
+ port: 9090
+ adminConnectors:
+ - type: http
+ port: 9091
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-server/src/test/java/org/apache/eagle/server/ServerApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-server/src/test/java/org/apache/eagle/server/ServerApplicationTest.java b/eagle-server/src/test/java/org/apache/eagle/server/ServerApplicationTest.java
index 1ef5127..823981f 100644
--- a/eagle-server/src/test/java/org/apache/eagle/server/ServerApplicationTest.java
+++ b/eagle-server/src/test/java/org/apache/eagle/server/ServerApplicationTest.java
@@ -21,6 +21,6 @@ import org.junit.Test;
public class ServerApplicationTest {
@Test
public void testServerMain() throws Exception {
- ServerMain.main(new String[]{"server"});
+ ServerMain.main(new String[]{"server", "src/test/resources/configuration.yml"});
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-server/src/test/resources/configuration.yml
----------------------------------------------------------------------
diff --git a/eagle-server/src/test/resources/configuration.yml b/eagle-server/src/test/resources/configuration.yml
new file mode 100644
index 0000000..c671ade
--- /dev/null
+++ b/eagle-server/src/test/resources/configuration.yml
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+server:
+ applicationConnectors:
+ - type: http
+ port: 9090
+ adminConnectors:
+ - type: http
+ port: 9091
[2/3] incubator-eagle git commit: HBase audit monitoring with new app
framework https://issues.apache.org/jira/browse/EAGLE-420 Author: Yong Zhang
Reviewer: Hao Chen
Posted by yo...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
index 7c64e50..60f49ef 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
@@ -1,46 +1,47 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-
-import java.lang.reflect.ParameterizedType;
-
-public interface StreamSinkProvider<S extends StreamSink<D>,D extends StreamSinkConfig>{
- /**
- * @param streamId
- * @param appConfig
- * @return
- */
- D getSinkConfig(String streamId, Configuration appConfig);
- S getSink();
-
- default S getSink(String streamId, Configuration appConfig){
- S s = getSink();
- s.init(streamId,getSinkConfig(streamId,appConfig));
- return s;
- }
-
- default Class<? extends S> getSinkType(){
- return (Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
- }
-
- default Class<? extends D> getSinkConfigType(){
- return (Class<D>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[1];
- }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+
+import java.lang.reflect.ParameterizedType;
+
+public interface StreamSinkProvider<S extends StreamSink<D>,D extends StreamSinkConfig>{
+ /**
+ * @param streamId
+ * @param config
+ * @return
+ */
+ D getSinkConfig(String streamId, Config config);
+ S getSink();
+
+ default S getSink(String streamId, Config config){
+ S s = getSink();
+ s.init(streamId,getSinkConfig(streamId,config));
+ return s;
+ }
+
+ default Class<? extends S> getSinkType(){
+ return (Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+ }
+
+ default Class<? extends D> getSinkConfigType(){
+ return (Class<D>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[1];
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
index 20816db..bf1e587 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
@@ -128,4 +128,4 @@ public abstract class AbstractApplicationProvider<T extends Application> impleme
public ApplicationDesc getApplicationDesc() {
return applicationDesc;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
index ace0c45..be84f0c 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
@@ -34,4 +34,4 @@ public interface ApplicationProvider<T extends Application> {
* @return application instance
*/
T getApplication();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
index 2c686d9..1ef91ff 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
@@ -54,8 +54,11 @@ public class ServerSimulatorImpl extends ServerSimulator {
SiteEntity siteEntity = getUniqueSite();
siteResource.createSite(siteEntity);
Assert.assertNotNull(siteEntity.getUuid());
+ ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL);
+ installOperation.setConfiguration(appConfig);
// Install application
- ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL)).getData();
+ ApplicationEntity applicationEntity =
+ applicationResource.installApplication(installOperation).getData();
// Start application
applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
}
@@ -86,4 +89,4 @@ public class ServerSimulatorImpl extends ServerSimulator {
throw new IllegalStateException(e.getMessage(),e);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
index 0558454..f58f0aa 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
@@ -1,84 +1,90 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.spi.AbstractApplicationProvider;
-import org.junit.Ignore;
-
-import java.util.Arrays;
-import java.util.Map;
-
-@Ignore
-public class TestStormApplication extends StormApplication<TestStormApplication.TestStormAppConfig>{
- @Override
- public StormTopology execute(TestStormAppConfig config, StormEnvironment environment) {
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("metric_spout", new RandomEventSpout(), config.getSpoutNum());
- builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
- builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
- return builder.createTopology();
- }
-
- public final static class TestStormAppConfig extends Configuration{
- private int spoutNum = 1;
-
- public int getSpoutNum() {
- return spoutNum;
- }
-
- public void setSpoutNum(int spoutNum) {
- this.spoutNum = spoutNum;
- }
- }
-
- private class RandomEventSpout extends BaseRichSpout {
- private SpoutOutputCollector _collector;
- @Override
- public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
- _collector = spoutOutputCollector;
- }
-
- @Override
- public void nextTuple() {
- _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
- _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
- }
- }
-
- public final static class Provider extends AbstractApplicationProvider<TestStormApplication> {
- public Provider(){
- super("TestApplicationMetadata.xml");
- }
- @Override
- public TestStormApplication getApplication() {
- return new TestStormApplication();
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+import org.junit.Ignore;
+
+import java.util.Arrays;
+import java.util.Map;
+
+@Ignore
+public class TestStormApplication extends StormApplication<TestStormApplication.TestStormAppConfig>{
+ @Override
+ public StormTopology execute(TestStormAppConfig config, StormEnvironment environment){
+ return null;
+ }
+
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("metric_spout", new RandomEventSpout(), config.getInt("spoutNum"));
+ builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+ builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+ return builder.createTopology();
+ }
+
+ public final static class TestStormAppConfig extends Configuration{
+ private int spoutNum = 1;
+
+ public int getSpoutNum() {
+ return spoutNum;
+ }
+
+ public void setSpoutNum(int spoutNum) {
+ this.spoutNum = spoutNum;
+ }
+ }
+
+ private class RandomEventSpout extends BaseRichSpout {
+ private SpoutOutputCollector _collector;
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ _collector = spoutOutputCollector;
+ }
+
+ @Override
+ public void nextTuple() {
+ _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
+ _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+ }
+ }
+
+ public final static class Provider extends AbstractApplicationProvider<TestStormApplication> {
+ public Provider(){
+ super("TestApplicationMetadata.xml");
+ }
+ @Override
+ public TestStormApplication getApplication() {
+ return new TestStormApplication();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
index 3db5f20..bbdfbfa 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
@@ -1,97 +1,94 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.storm;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.app.StormApplication;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
-
-import java.util.Arrays;
-import java.util.Map;
-
-public class MockStormApplication extends StormApplication<MockStormApplication.MockStormConfiguration> {
- private MockStormConfiguration appConfig;
-
- @Override
- public StormTopology execute(MockStormConfiguration config, StormEnvironment environment) {
- this.setAppConfig(config);
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("metric_spout", new RandomEventSpout(), config.getSpoutNum());
- builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
- builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
- return builder.createTopology();
- }
-
- public MockStormConfiguration getAppConfig() {
- return appConfig;
- }
-
- private void setAppConfig(MockStormConfiguration appConfig) {
- this.appConfig = appConfig;
- }
-
- /**
- * TODO: Load configuration from name space in application className
- * Application Configuration
- */
- static class MockStormConfiguration extends Configuration {
- private int spoutNum = 1;
- private boolean loaded = false;
-
- public int getSpoutNum() {
- return spoutNum;
- }
-
- public void setSpoutNum(int spoutNum) {
- this.spoutNum = spoutNum;
- }
-
- public boolean isLoaded() {
- return loaded;
- }
-
- public void setLoaded(boolean loaded) {
- this.loaded = loaded;
- }
- }
-
- private class RandomEventSpout extends BaseRichSpout {
- private SpoutOutputCollector _collector;
- @Override
- public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
- _collector = spoutOutputCollector;
- }
-
- @Override
- public void nextTuple() {
- _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
- _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
- }
- }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.storm;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+
+import java.util.Arrays;
+import java.util.Map;
+
+public class MockStormApplication extends StormApplication<MockStormApplication.MockStormConfiguration> {
+ private MockStormConfiguration appConfig;
+
+ @Override
+ public StormTopology execute(MockStormConfiguration config, StormEnvironment environment) {
+ return null;
+ }
+
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("metric_spout", new RandomEventSpout(), config.getInt("spoutNum"));
+ builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+ builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+ return builder.createTopology();
+ }
+
+ /**
+ * TODO: Load configuration from name space in application className
+ * Application Configuration
+ */
+ static class MockStormConfiguration extends Configuration {
+ private int spoutNum = 1;
+ private boolean loaded = false;
+
+ public int getSpoutNum() {
+ return spoutNum;
+ }
+
+ public void setSpoutNum(int spoutNum) {
+ this.spoutNum = spoutNum;
+ }
+
+ public boolean isLoaded() {
+ return loaded;
+ }
+
+ public void setLoaded(boolean loaded) {
+ this.loaded = loaded;
+ }
+ }
+
+ private class RandomEventSpout extends BaseRichSpout {
+ private SpoutOutputCollector _collector;
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ _collector = spoutOutputCollector;
+ }
+
+ @Override
+ public void nextTuple() {
+ _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
+ _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
index 8e05b5e..32dae23 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
@@ -1,75 +1,51 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.storm;
-
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.utils.DynamicJarPathFinder;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-
-public class MockStormApplicationTest {
- @Test
- public void testGetConfigClass(){
- MockStormApplication mockStormApplication = new MockStormApplication();
- Assert.assertEquals(MockStormApplication.MockStormConfiguration.class,mockStormApplication.getConfigType());
- }
-
- @Test
- public void testGetConfigFromMap(){
- MockStormApplication mockStormApplication = new MockStormApplication();
- mockStormApplication.execute(new HashMap<String,Object>(){
- {
- put("spoutNum",1234);
- put("loaded",true);
- put("mode", ApplicationEntity.Mode.CLUSTER);
- }
- },new StormEnvironment(ConfigFactory.load()));
- Assert.assertTrue(mockStormApplication.getAppConfig().isLoaded());
- Assert.assertEquals(1234,mockStormApplication.getAppConfig().getSpoutNum());
- Assert.assertEquals(ApplicationEntity.Mode.CLUSTER,mockStormApplication.getAppConfig().getMode());
- }
-
- @Test
- public void testGetConfigFromEnvironmentConfigFile(){
- MockStormApplication mockStormApplication = new MockStormApplication();
- mockStormApplication.execute(new StormEnvironment(ConfigFactory.load()));
- Assert.assertTrue(mockStormApplication.getAppConfig().isLoaded());
- Assert.assertEquals(3,mockStormApplication.getAppConfig().getSpoutNum());
- Assert.assertEquals(ApplicationEntity.Mode.LOCAL,mockStormApplication.getAppConfig().getMode());
- }
-
- @Test
- public void testRunApplicationWithSysConfig(){
- new MockStormApplication().run();
- }
-
- @Test
- public void testRunApplicationWithAppConfig() throws InterruptedException {
- MockStormApplication.MockStormConfiguration appConfig = new MockStormApplication.MockStormConfiguration();
- appConfig.setJarPath(DynamicJarPathFinder.findPath(MockStormApplication.class));
- appConfig.setSiteId("test_site");
- appConfig.setAppId("test_application_storm_topology");
- appConfig.setMode(ApplicationEntity.Mode.LOCAL);
- appConfig.setLoaded(true);
- appConfig.setSpoutNum(4);
- new MockStormApplication().run(appConfig);
- }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.storm;
+
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.utils.DynamicJarPathFinder;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class MockStormApplicationTest {
+ @Test
+ public void testGetConfigClass(){
+ MockStormApplication mockStormApplication = new MockStormApplication();
+ Assert.assertEquals(MockStormApplication.MockStormConfiguration.class,mockStormApplication.getConfigType());
+ }
+
+ @Test
+ public void testRunApplicationWithSysConfig(){
+ new MockStormApplication().run();
+ }
+
+ @Test
+ public void testRunApplicationWithAppConfig() throws InterruptedException {
+ MockStormApplication.MockStormConfiguration appConfig = new MockStormApplication.MockStormConfiguration();
+ appConfig.setJarPath(DynamicJarPathFinder.findPath(MockStormApplication.class));
+ appConfig.setSiteId("test_site");
+ appConfig.setAppId("test_application_storm_topology");
+ appConfig.setMode(ApplicationEntity.Mode.LOCAL);
+ appConfig.setLoaded(true);
+ appConfig.setSpoutNum(4);
+ new MockStormApplication().run(appConfig);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
index 9f154f9..64f0974 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
@@ -52,10 +52,15 @@
}
}
- "org.apache.eagle.app.storm.MockStormApplication": {
- "spoutNum": 3
- "loaded": true
- "mode":"LOCAL",
- "appId":"test_topology_name"
+ "appId":"test_topology_name"
+ "spoutNum": 3
+ "loaded": true
+ "mode":"LOCAL"
+
+ "dataSinkConfig": {
+ "topic" : "test_topic",
+ "brokerList" : "sandbox.hortonworks.com:6667",
+ "serializerClass" : "kafka.serializer.StringEncoder",
+ "keySerializerClass" : "kafka.serializer.StringEncoder"
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
index 6e4521d..c651b6b 100644
--- a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
+++ b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
@@ -21,7 +21,6 @@ package org.apache.eagle.service.application;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.policy.common.Constants;
import org.apache.eagle.service.application.dao.ApplicationManagerDAO;
import org.apache.eagle.service.application.dao.ApplicationManagerDaoImpl;
import org.apache.eagle.service.application.entity.TopologyExecutionStatus;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
index edb9fc0..2b37d25 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
@@ -111,11 +111,11 @@ public class GenericServiceAPIResponseEntity<T>{
public String getException() {
return exception;
}
- public void setException(String exception) {
- this.exception = exception;
- }
+// public void setException(String exception) {
+// this.exception = exception;
+// }
- public void setException(Exception exception){
- if(exception!=null) this.exception = EagleExceptionWrapper.wrap(exception);
+ public void setException(Exception exceptionObj){
+ if(exception!=null) this.exception = EagleExceptionWrapper.wrap(exceptionObj);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
index 85b875d..940ee8a 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
@@ -54,7 +54,8 @@ public class GenericServiceAPIResponseEntityDeserializer extends JsonDeserialize
else if(SUCCESS_FIELD.equals(field.getKey()) && field.getValue() != null){
entity.setSuccess(field.getValue().getValueAsBoolean(false));
}else if(EXCEPTION_FIELD.equals(field.getKey()) && field.getValue() != null){
- entity.setException(field.getValue().getTextValue());
+// entity.setException(field.getValue().getTextValue());
+ entity.setException(new Exception(field.getValue().getTextValue()));
}else if(TYPE_FIELD.endsWith(field.getKey()) && field.getValue() != null){
try {
entity.setType(Class.forName(field.getValue().getTextValue()));
@@ -81,4 +82,4 @@ public class GenericServiceAPIResponseEntityDeserializer extends JsonDeserialize
}
return entity;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
index fb52352..b7f27f5 100644
--- a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
@@ -439,7 +439,7 @@ public class GenericEntityServiceResource {
LOG.error("Data storage is null");
throw new IllegalDataStorageException("data storage is null");
}
-
+
QueryResult<?> result = queryStatement.execute(dataStorage);
if(result.isSuccess()){
meta.put(FIRST_TIMESTAMP, result.getFirstTimestamp());
@@ -543,7 +543,7 @@ public class GenericEntityServiceResource {
LOG.error("Data storage is null");
throw new IllegalDataStorageException("Data storage is null");
}
-
+
DeleteStatement deleteStatement = new DeleteStatement(rawQuery);
ModifyResult<String> deleteResult = deleteStatement.execute(dataStorage);
if(deleteResult.isSuccess()){
@@ -627,4 +627,4 @@ public class GenericEntityServiceResource {
}
return response;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
index 6819e59..d4c0e0c 100644
--- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
+++ b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
@@ -24,6 +24,7 @@ import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
@@ -33,8 +34,13 @@ import java.util.Map;
public class ExampleStormApplication extends StormApplication<ExampleStormConfig> {
@Override
public StormTopology execute(ExampleStormConfig config, StormEnvironment environment) {
+ return null;
+ }
+
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("metric_spout", new RandomEventSpout(), config.getSpoutNum());
+ builder.setSpout("metric_spout", new RandomEventSpout(), config.getInt("spoutNum"));
builder.setBolt("sink_1",environment.getFlattenStreamSink("SAMPLE_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
builder.setBolt("sink_2",environment.getFlattenStreamSink("SAMPLE_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
return builder.createTopology();
@@ -59,4 +65,4 @@ public class ExampleStormApplication extends StormApplication<ExampleStormConfig
outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
index 45dd7bd..88dd02d 100644
--- a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
+++ b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
@@ -26,10 +26,13 @@ import org.apache.eagle.metadata.model.ApplicationEntity;
import org.apache.eagle.metadata.model.SiteEntity;
import org.apache.eagle.metadata.resource.SiteResource;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
@RunWith(AppUnitTestRunner.class)
public class ExampleApplicationProviderTest {
@@ -54,6 +57,7 @@ public class ExampleApplicationProviderTest {
* @throws InterruptedException
*/
@Test
+ @Ignore
public void testApplicationLifecycle() throws InterruptedException {
// Create local site
SiteEntity siteEntity = new SiteEntity();
@@ -63,8 +67,10 @@ public class ExampleApplicationProviderTest {
siteResource.createSite(siteEntity);
Assert.assertNotNull(siteEntity.getUuid());
+ ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site","EXAMPLE_APPLICATION", ApplicationEntity.Mode.LOCAL);
+ installOperation.setConfiguration(getConf());
// Install application
- ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation("test_site","EXAMPLE_APPLICATION", ApplicationEntity.Mode.LOCAL)).getData();
+ ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData();
// Start application
applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
// Stop application
@@ -81,16 +87,29 @@ public class ExampleApplicationProviderTest {
@Test
public void testApplicationQuickRunWithAppType(){
- simulator.start("EXAMPLE_APPLICATION");
+ simulator.start("EXAMPLE_APPLICATION", getConf());
}
+ @Ignore
@Test
- public void testApplicationQuickRunWithAppProvider(){
- simulator.start(ExampleApplicationProvider.class);
+ public void testApplicationQuickRunWithAppProvider() throws Exception{
+ simulator.start(ExampleApplicationProvider.class, getConf());
}
+ @Ignore
@Test
- public void testApplicationQuickRunWithAppProvider2(){
- simulator.start(ExampleApplicationProvider2.class);
+ public void testApplicationQuickRunWithAppProvider2() throws Exception{
+ simulator.start(ExampleApplicationProvider2.class, getConf());
}
-}
\ No newline at end of file
+
+ private Map<String, Object> getConf(){
+ Map<String, Object> conf = new HashMap<>();
+ conf.put("dataSinkConfig.topic", "testTopic");
+ conf.put("dataSinkConfig.brokerList", "broker");
+ conf.put("dataSinkConfig.serializerClass", "serializerClass");
+ conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass");
+ conf.put("spoutNum", 2);
+ conf.put("mode", "LOCAL");
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-examples/eagle-app-example/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/resources/application.conf b/eagle-examples/eagle-app-example/src/test/resources/application.conf
index bfbf20e..2873037 100644
--- a/eagle-examples/eagle-app-example/src/test/resources/application.conf
+++ b/eagle-examples/eagle-app-example/src/test/resources/application.conf
@@ -56,7 +56,15 @@
}
},
- "org.apache.eagle.app.example.ExampleStormApplication": {
"appId": "unit_test_example_app"
+ "spoutNum": 3
+ "loaded": true
+ "mode":"LOCAL"
+
+ "dataSinkConfig": {
+ "topic" : "test_topic",
+ "brokerList" : "sandbox.hortonworks.com:6667",
+ "serializerClass" : "kafka.serializer.StringEncoder",
+ "keySerializerClass" : "kafka.serializer.StringEncoder"
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java b/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
index 89e5433..25506cc 100644
--- a/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
+++ b/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
@@ -23,6 +23,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
@@ -32,6 +33,11 @@ import java.util.Map;
public class JPMApplication extends StormApplication<JPMConfiguration> {
@Override
public StormTopology execute(JPMConfiguration config, StormEnvironment environment) {
+ return null;
+ }
+
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("metric_spout", new RandomEventSpout(), 4);
builder.setBolt("sink_1",environment.getFlattenStreamSink("SAMPLE_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
@@ -57,4 +63,4 @@ public class JPMApplication extends StormApplication<JPMConfiguration> {
outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java b/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
index 30bbc96..c955d36 100644
--- a/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
+++ b/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
@@ -27,6 +27,9 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
+import java.util.HashMap;
+import java.util.Map;
+
@RunWith(AppUnitTestRunner.class)
public class JPMApplicationTest {
@Inject
@@ -53,8 +56,11 @@ public class JPMApplicationTest {
siteResource.createSite(siteEntity);
Assert.assertNotNull(siteEntity.getUuid());
+ ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site","JPM_APP", ApplicationEntity.Mode.LOCAL);
+ installOperation.setConfiguration(getConf());
+
// Install application
- ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation("test_site","JPM_APP", ApplicationEntity.Mode.LOCAL)).getData();
+ ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData();
// Start application
applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
// Stop application
@@ -68,4 +74,15 @@ public class JPMApplicationTest {
// Expected exception
}
}
+
+ private Map<String, Object> getConf(){
+ Map<String, Object> conf = new HashMap<>();
+ conf.put("dataSinkConfig.topic", "testTopic");
+ conf.put("dataSinkConfig.brokerList", "broker");
+ conf.put("dataSinkConfig.serializerClass", "serializerClass");
+ conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass");
+ conf.put("spoutNum", 2);
+ conf.put("mode", "LOCAL");
+ return conf;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/pom.xml b/eagle-security/eagle-security-common/pom.xml
index 83971f2..18f9bd0 100644
--- a/eagle-security/eagle-security-common/pom.xml
+++ b/eagle-security/eagle-security-common/pom.xml
@@ -52,6 +52,11 @@
<artifactId>eagle-alert-service</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-metadata-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java
new file mode 100644
index 0000000..e3c9f95
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.service;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * service stub to get metadata from remote metadata service
+ */
+public interface IMetadataServiceClient extends Closeable, Serializable {
+ Collection<HBaseSensitivityEntity> listHBaseSensitivies();
+ OpResult addHBaseSensitivity(Collection<HBaseSensitivityEntity> h);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
index 45e729e..534fb38 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
@@ -18,6 +18,7 @@
package org.apache.eagle.security.service;
import java.util.Collection;
+
/**
* Since 6/10/16.
*/
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
index 53d7132..27aeb57 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
@@ -28,7 +28,7 @@ import java.util.*;
/**
* In memory service for simple service start. Make all service API as
* synchronized.
- *
+ *
* @since Apr 11, 2016
*
*/
@@ -39,8 +39,7 @@ public class InMemMetadataDaoImpl implements ISecurityMetadataDAO {
private Map<Pair<String, String>, HBaseSensitivityEntity> hBaseSensitivityEntities = new HashMap<>();
@Inject
- public InMemMetadataDaoImpl(Config config) {
-
+ public InMemMetadataDaoImpl() {
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
index f17fd43..65e86f0 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
@@ -17,7 +17,6 @@
package org.apache.eagle.security.service;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,41 +27,28 @@ import java.lang.reflect.Constructor;
*
*/
public class MetadataDaoFactory {
-
- private static final MetadataDaoFactory INSTANCE = new MetadataDaoFactory();
private static final Logger LOG = LoggerFactory.getLogger(MetadataDaoFactory.class);
- private ISecurityMetadataDAO dao;
-
- private MetadataDaoFactory() {
- Config config = ConfigFactory.load();
- Config datastoreConfig = config.getConfig("datastore");
- if (datastoreConfig == null) {
- LOG.warn("datastore is not configured, use in-memory store !!!");
- dao = new InMemMetadataDaoImpl(datastoreConfig);
+ public static ISecurityMetadataDAO getMetadataDAO(String storeCls) {
+ ISecurityMetadataDAO dao = null;
+ if (storeCls == null) {
+ LOG.warn("metadata store is not configured, use in-memory store !!!");
+ dao = new InMemMetadataDaoImpl();
} else {
- String clsName = datastoreConfig.getString("metadataDao");
Class<?> clz;
try {
- clz = Thread.currentThread().getContextClassLoader().loadClass(clsName);
+ clz = Thread.currentThread().getContextClassLoader().loadClass(storeCls);
if (ISecurityMetadataDAO.class.isAssignableFrom(clz)) {
- Constructor<?> cotr = clz.getConstructor(Config.class);
- dao = (ISecurityMetadataDAO) cotr.newInstance(datastoreConfig);
+ Constructor<?> cotr = clz.getConstructor();
+ dao = (ISecurityMetadataDAO) cotr.newInstance();
} else {
throw new Exception("metadataDao configuration need to be implementation of IMetadataDao! ");
}
} catch (Exception e) {
LOG.error("error when initialize the dao, fall back to in memory mode!", e);
- dao = new InMemMetadataDaoImpl(datastoreConfig);
+ dao = new InMemMetadataDaoImpl();
}
}
- }
-
- public static MetadataDaoFactory getInstance() {
- return INSTANCE;
- }
-
- public ISecurityMetadataDAO getMetadataDao() {
return dao;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java
new file mode 100644
index 0000000..676bde5
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java
@@ -0,0 +1,114 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.service;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.GenericType;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.typesafe.config.Config;
+import org.codehaus.jackson.jaxrs.JacksonJsonProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+public class MetadataServiceClientImpl implements IMetadataServiceClient {
+ private static final long serialVersionUID = 3003976065082684128L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(MetadataServiceClientImpl.class);
+
+ private static final String METADATA_LIST_HBASE_SENSITIVITY_PATH = "/metadata/sensitivity/hbase";
+ private static final String METADATA_ADD_HBASE_SENSITIVITY_PATH = "/metadata/sensitivity/hbase";
+
+ private static final String METADATA_CLEAR_PATH = "/metadata/clear";
+
+ private static final String EAGLE_CORRELATION_CONTEXT = "metadataService.context";
+ private static final String EAGLE_CORRELATION_SERVICE_PORT = "metadataService.port";
+ private static final String EAGLE_CORRELATION_SERVICE_HOST = "metadataService.host";
+
+ protected static final String CONTENT_TYPE = "Content-Type";
+
+ private String host;
+ private int port;
+ private String context;
+ private transient Client client;
+ private String basePath;
+
+ public MetadataServiceClientImpl(Config config) {
+ this(config.getString(EAGLE_CORRELATION_SERVICE_HOST), config.getInt(EAGLE_CORRELATION_SERVICE_PORT), config
+ .getString(EAGLE_CORRELATION_CONTEXT));
+ basePath = buildBasePath();
+ }
+
+ public MetadataServiceClientImpl(String host, int port, String context) {
+ this.host = host;
+ this.port = port;
+ this.context = context;
+ this.basePath = buildBasePath();
+ ClientConfig cc = new DefaultClientConfig();
+ cc.getProperties().put(DefaultClientConfig.PROPERTY_CONNECT_TIMEOUT, 60 * 1000);
+ cc.getProperties().put(DefaultClientConfig.PROPERTY_READ_TIMEOUT, 60 * 1000);
+ cc.getClasses().add(JacksonJsonProvider.class);
+ cc.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
+ this.client = Client.create(cc);
+ client.addFilter(new com.sun.jersey.api.client.filter.GZIPContentEncodingFilter());
+ }
+
+ private String buildBasePath() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("http://");
+ sb.append(host);
+ sb.append(":");
+ sb.append(port);
+ sb.append(context);
+ return sb.toString();
+ }
+
+ private <T> List<T> list(String path, GenericType<List<T>> type) {
+ WebResource r = client.resource(basePath + path);
+ LOG.info("query URL {}", basePath + path);
+ List<T> ret = r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).get(type);
+ return ret;
+ }
+
+ @Override
+ public void close() throws IOException {
+ client.destroy();
+ }
+
+ @Override
+ public Collection<HBaseSensitivityEntity> listHBaseSensitivies() {
+ return list(METADATA_LIST_HBASE_SENSITIVITY_PATH, new GenericType<List<HBaseSensitivityEntity>>() {
+ });
+ }
+
+ @Override
+ public OpResult addHBaseSensitivity(Collection<HBaseSensitivityEntity> h) {
+ WebResource r = client.resource(basePath + METADATA_ADD_HBASE_SENSITIVITY_PATH);
+ r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(h);
+ return new OpResult();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java
deleted file mode 100644
index 05440fb..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.service;
-
-import javax.ws.rs.Path;
-
-/**
- * Since 6/10/16.
- */
-@Path("/metadata/sensitivity")
-public class SensitivityMetadataResource {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
index e60e59e..3401b3c 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
@@ -56,8 +56,6 @@ public class NewKafkaSourcedSpoutProvider implements StormSpoutProvider {
String groupId = context.getString("consumerGroupId");
// Kafka fetch size
int fetchSize = context.getInt("fetchSize");
- // Kafka deserializer class
- String deserClsName = context.getString("deserializerClass");
// Kafka broker zk connection
String zkConnString = context.getString("zkConnection");
// transaction zkRoot
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java
new file mode 100644
index 0000000..662311c
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.hbase;
+
+import org.apache.eagle.app.Configuration;
+
+/**
+ * Since 8/5/16.
+ */
+public class HBaseAuditLogAppConf extends Configuration{
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
new file mode 100644
index 0000000..051d8c4
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.hbase;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+import org.apache.eagle.security.service.MetadataDaoFactory;
+
+/**
+ * Since 8/5/16.
+ */
+public class HBaseAuditLogAppProvider extends AbstractApplicationProvider<HBaseAuditLogApplication> {
+ public HBaseAuditLogAppProvider() {
+ super("/META-INF/metadata.xml");
+ }
+
+ @Override
+ public HBaseAuditLogApplication getApplication() {
+ return new HBaseAuditLogApplication();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
index 49393e1..3d80308 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
@@ -16,27 +16,35 @@
*/
package org.apache.eagle.security.hbase;
+import backtype.storm.generated.StormTopology;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.sink.StormStreamSink;
import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider;
import storm.kafka.StringScheme;
-import storm.kafka.bolt.KafkaBolt;
/**
* Since 7/27/16.
*/
-public class HBaseAuditLogApplication{
+public class HBaseAuditLogApplication extends StormApplication<HBaseAuditLogAppConf> {
public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
public final static String JOIN_TASK_NUM = "topology.numOfJoinTasks";
public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
- protected void buildApp(TopologyBuilder builder) {
- System.setProperty("config.resource", "/application.conf");
- Config config = ConfigFactory.load();
+ @Override
+ public StormTopology execute(HBaseAuditLogAppConf config1, StormEnvironment environment) {
+ return null;
+ }
+
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ TopologyBuilder builder = new TopologyBuilder();
NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
IRichSpout spout = provider.getSpout(config);
@@ -55,8 +63,15 @@ public class HBaseAuditLogApplication{
BoltDeclarer joinBoltDeclarer = builder.setBolt("joinBolt", joinBolt, numOfJoinTasks);
joinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
- KafkaBolt kafkaBolt = new KafkaBolt();
- BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", kafkaBolt, numOfSinkTasks);
- kafkaBoltDeclarer.shuffleGrouping("joinBolt");
+ StormStreamSink sinkBolt = environment.getFlattenStreamSink("hbase_audit_log_stream",config);
+ BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks);
+ kafkaBoltDeclarer.fieldsGrouping("joinBolt", new Fields("user"));
+ return builder.createTopology();
+ }
+
+ public static void main(String[] args){
+ Config config = ConfigFactory.load();
+ HBaseAuditLogApplication app = new HBaseAuditLogApplication();
+ app.run(config);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java
deleted file mode 100644
index 13ca214..0000000
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.security.hbase;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider;
-import org.apache.eagle.security.topo.TopologySubmitter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.*;
-import storm.kafka.bolt.KafkaBolt;
-
-public class HbaseAuditLogMonitoringMain {
- private static Logger LOG = LoggerFactory.getLogger(HbaseAuditLogMonitoringMain.class);
- public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
- public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
- public final static String JOIN_TASK_NUM = "topology.numOfJoinTasks";
- public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
-
- public static void main(String[] args) throws Exception{
- System.setProperty("config.resource", "/application.conf");
- Config config = ConfigFactory.load();
- NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
- IRichSpout spout = provider.getSpout(config);
-
- HBaseAuditLogParserBolt bolt = new HBaseAuditLogParserBolt();
- TopologyBuilder builder = new TopologyBuilder();
-
- int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
- int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
- int numOfJoinTasks = config.getInt(JOIN_TASK_NUM);
- int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
-
- builder.setSpout("ingest", spout, numOfSpoutTasks);
- BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", bolt, numOfParserTasks);
- boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
-
- HbaseResourceSensitivityDataJoinBolt joinBolt = new HbaseResourceSensitivityDataJoinBolt(config);
- BoltDeclarer joinBoltDeclarer = builder.setBolt("joinBolt", joinBolt, numOfJoinTasks);
- joinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
-
- KafkaBolt kafkaBolt = new KafkaBolt();
- BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", kafkaBolt, numOfSinkTasks);
- kafkaBoltDeclarer.shuffleGrouping("joinBolt");
-
- StormTopology topology = builder.createTopology();
-
- TopologySubmitter.submit(topology, config);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
index cf486d3..d8d9d6b 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
@@ -23,14 +23,12 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
import org.apache.eagle.security.entity.HbaseResourceSensitivityAPIEntity;
import org.apache.eagle.security.util.ExternalDataCache;
import org.apache.eagle.security.util.ExternalDataJoiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import java.util.Arrays;
import java.util.Map;
@@ -94,9 +92,7 @@ public class HbaseResourceSensitivityDataJoinBolt extends BaseRichBolt {
}
LOG.info("After hbase resource sensitivity lookup: " + newEvent);
// push to Kafka sink
- ObjectMapper mapper = new ObjectMapper();
- String msg = mapper.writeValueAsString(map);
- collector.emit(Arrays.asList(newEvent.get("user"), msg));
+ collector.emit(Arrays.asList(newEvent.get("user"), newEvent));
}catch(Exception ex){
LOG.error("error joining data, ignore it", ex);
}finally {
@@ -106,6 +102,6 @@ public class HbaseResourceSensitivityDataJoinBolt extends BaseRichBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY, FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE));
+ declarer.declare(new Fields("user", "message"));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
index 9ca0701..603ed5a 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
@@ -19,10 +19,8 @@ package org.apache.eagle.security.hbase;
import com.google.common.base.Function;
import com.google.common.collect.Maps;
-import org.apache.eagle.security.entity.HbaseResourceSensitivityAPIEntity;
-import org.apache.eagle.security.service.HBaseSensitivityEntity;
-import org.apache.eagle.security.service.ISecurityMetadataDAO;
-import org.apache.eagle.security.service.MetadataDaoFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.security.service.*;
import org.apache.eagle.security.util.AbstractResourceSensitivityPollingJob;
import org.apache.eagle.security.util.ExternalDataCache;
import org.quartz.Job;
@@ -33,7 +31,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
public class HbaseResourceSensitivityPollingJob extends AbstractResourceSensitivityPollingJob implements Job {
@@ -44,12 +41,33 @@ public class HbaseResourceSensitivityPollingJob extends AbstractResourceSensitiv
throws JobExecutionException {
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
try {
- ISecurityMetadataDAO dao = MetadataDaoFactory.getInstance().getMetadataDao();
- Collection<HBaseSensitivityEntity> sensitivityEntities = dao.listHBaseSensitivies();
- ExternalDataCache.getInstance().setJobResult(getClass(), sensitivityEntities);
+ Collection<HBaseSensitivityEntity> sensitivityEntities = load(jobDataMap);
+ Map<String, HBaseSensitivityEntity> map = Maps.uniqueIndex(
+ sensitivityEntities,
+ new Function<HBaseSensitivityEntity, String>() {
+ @Override
+ public String apply(HBaseSensitivityEntity input) {
+ return input.getHbaseResource();
+ }
+ });
+ ExternalDataCache.getInstance().setJobResult(getClass(), map);
} catch(Exception ex) {
LOG.error("Fail to load hbase resource sensitivity data", ex);
}
}
-}
\ No newline at end of file
+ private Collection<HBaseSensitivityEntity> load(JobDataMap jobDataMap) throws Exception {
+ Map<String, Object> map = (Map<String,Object>)jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE);
+ String eagleServiceHost = (String)map.get(EagleConfigConstants.HOST);
+ Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString());
+ String username = map.containsKey(EagleConfigConstants.USERNAME) ? (String)map.get(EagleConfigConstants.USERNAME) : null;
+ String password = map.containsKey(EagleConfigConstants.PASSWORD) ? (String)map.get(EagleConfigConstants.PASSWORD) : null;
+
+ // load from eagle database
+ LOG.info("Load hbase resource sensitivity information from eagle service "
+ + eagleServiceHost + ":" + eagleServicePort);
+
+ IMetadataServiceClient client = new MetadataServiceClientImpl(eagleServiceHost, eagleServicePort, "/rest");
+ return client.listHBaseSensitivies();
+ }
+}
[3/3] incubator-eagle git commit: HBase audit monitoring with new app
framework https://issues.apache.org/jira/browse/EAGLE-420 Author: Yong Zhang
Reviewer: Hao Chen
Posted by yo...@apache.org.
HBase audit monitoring with new app framework
https://issues.apache.org/jira/browse/EAGLE-420
Author: Yong Zhang
Reviewer: Hao Chen
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/660bfbd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/660bfbd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/660bfbd3
Branch: refs/heads/develop
Commit: 660bfbd3fac6febddb612b9b629a8d466b536b3d
Parents: 1d84256
Author: yonzhang <yo...@gmail.com>
Authored: Sun Aug 7 17:49:48 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Sun Aug 7 17:49:48 2016 -0700
----------------------------------------------------------------------
.../TestSiddhiStateSnapshotAndRestore.java | 506 -------------------
.../resolver/AttributeResolveResource.java | 6 +-
.../apache/eagle/app/AbstractApplication.java | 143 +++---
.../java/org/apache/eagle/app/Application.java | 11 +-
.../eagle/app/environment/ExecutionRuntime.java | 107 ++--
.../environment/impl/SparkExecutionRuntime.java | 117 ++---
.../app/environment/impl/StormEnvironment.java | 84 +--
.../environment/impl/StormExecutionRuntime.java | 327 ++++++------
.../eagle/app/service/ApplicationContext.java | 13 +-
.../impl/ApplicationManagementServiceImpl.java | 36 +-
.../apache/eagle/app/sink/KafkaStreamSink.java | 52 +-
.../eagle/app/sink/KafkaStreamSinkConfig.java | 29 +-
.../eagle/app/sink/LoggingStreamSink.java | 5 +-
.../apache/eagle/app/sink/StormStreamSink.java | 2 +-
.../eagle/app/sink/StreamSinkProvider.java | 93 ++--
.../app/spi/AbstractApplicationProvider.java | 2 +-
.../eagle/app/spi/ApplicationProvider.java | 2 +-
.../eagle/app/test/ServerSimulatorImpl.java | 7 +-
.../apache/eagle/app/TestStormApplication.java | 174 ++++---
.../eagle/app/storm/MockStormApplication.java | 191 ++++---
.../app/storm/MockStormApplicationTest.java | 126 ++---
.../src/test/resources/application.conf | 17 +-
.../ApplicationManagementResource.java | 1 -
.../entity/GenericServiceAPIResponseEntity.java | 12 +-
...ricServiceAPIResponseEntityDeserializer.java | 5 +-
.../generic/GenericEntityServiceResource.java | 6 +-
.../app/example/ExampleStormApplication.java | 10 +-
.../example/ExampleApplicationProviderTest.java | 33 +-
.../src/test/resources/application.conf | 12 +-
.../apache/eagle/app/jpm/JPMApplication.java | 8 +-
.../eagle/app/jpm/JPMApplicationTest.java | 19 +-
eagle-security/eagle-security-common/pom.xml | 5 +
.../service/IMetadataServiceClient.java | 32 ++
.../security/service/ISecurityMetadataDAO.java | 1 +
.../security/service/InMemMetadataDaoImpl.java | 5 +-
.../security/service/MetadataDaoFactory.java | 32 +-
.../service/MetadataServiceClientImpl.java | 114 +++++
.../service/SensitivityMetadataResource.java | 27 -
.../topo/NewKafkaSourcedSpoutProvider.java | 2 -
.../security/hbase/HBaseAuditLogAppConf.java | 28 +
.../hbase/HBaseAuditLogAppProvider.java | 38 ++
.../hbase/HBaseAuditLogApplication.java | 31 +-
.../hbase/HbaseAuditLogMonitoringMain.java | 72 ---
.../HbaseResourceSensitivityDataJoinBolt.java | 8 +-
.../HbaseResourceSensitivityPollingJob.java | 36 +-
.../src/main/resources/META-INF/metadata.xml | 243 +++++++++
...org.apache.eagle.app.spi.ApplicationProvider | 35 ++
.../src/main/resources/application.conf | 8 +-
.../src/main/resources/application.conf.bak | 66 ---
.../src/main/resources/metadata.xml | 91 ----
.../src/main/resources/scripts | 22 +
.../hbase/TestHbaseAuditLogProcessTopology.java | 44 --
.../hbase/SensitivityMetadataResource.java | 64 +++
eagle-server/pom.xml | 14 +
.../src/main/resources/application.conf | 2 +-
.../src/main/resources/configuration.yml | 21 +
.../eagle/server/ServerApplicationTest.java | 4 +-
.../src/test/resources/configuration.yml | 21 +
58 files changed, 1609 insertions(+), 1613 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
deleted file mode 100644
index 131be28..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.state;
-
-import org.apache.eagle.common.DateTimeUtil;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-import org.wso2.siddhi.core.util.persistence.InMemoryPersistenceStore;
-import org.wso2.siddhi.core.util.persistence.PersistenceStore;
-
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-
-/**
- * experiment Siddhi state snapshot and restore
- */
-public class TestSiddhiStateSnapshotAndRestore {
- private ExecutionPlanRuntime setupRuntimeForSimple(){
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "" +
- "define stream testStream (cmd string, src string) ;";
- String queryString = "" +
- "@info(name = 'query1') " +
- "from testStream[(cmd == 'rename') and (src == '/tmp/pii')] " +
- "select cmd, src " +
- "insert into outputStream ;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + queryString);
-
- QueryCallback callback = new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- };
- executionPlanRuntime.addCallback("query1", callback);
- executionPlanRuntime.start();
- return executionPlanRuntime;
- }
-
- @Test
- public void testSimpleSiddhiQuery() throws Exception{
- String tmpdir = System.getProperty("java.io.tmpdir");
- System.out.println("temporary directory: " + tmpdir);
-
- String stateFile = tmpdir + "/siddhi-state";
- ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForSimple();
- executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii"});
- byte[] state = executionPlanRuntime.snapshot();
- int length = state.length;
- FileOutputStream output = new FileOutputStream(stateFile);
- output.write(state);
- output.close();
- executionPlanRuntime.shutdown();
-
- ExecutionPlanRuntime restoredRuntime = setupRuntimeForSimple();
- FileInputStream input = new FileInputStream(stateFile);
- byte[] restoredState = new byte[length];
- input.read(restoredState);
- restoredRuntime.restore(restoredState);
- restoredRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii"});
- input.close();
- restoredRuntime.shutdown();
- }
-
- private ExecutionPlanRuntime setupRuntimeForLengthSlideWindow(){
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "define stream testStream (user string, cmd string);";
- String query = "@info(name = 'query1') from testStream#window.length(3) "
- + " select *"
- + " insert all events into OutputStream";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
- executionPlanRuntime.addCallback("query1", new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- });
-
- executionPlanRuntime.start();
- return executionPlanRuntime;
- }
-
- @Ignore
- public void testLengthSlideWindow() throws Exception{
- String tmpdir = System.getProperty("java.io.tmpdir");
- System.out.println("temporary directory: " + tmpdir);
-
- String stateFile = tmpdir + "/siddhi-state-lengthslidewindow";
- ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForLengthSlideWindow();
- executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_1"});
- executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_2"});
- executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_3"});
- executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_4"});
- byte[] state = executionPlanRuntime.snapshot();
- int length = state.length;
- FileOutputStream output = new FileOutputStream(stateFile);
- output.write(state);
- output.close();
- executionPlanRuntime.shutdown();
-
- ExecutionPlanRuntime restoredRuntime = setupRuntimeForLengthSlideWindow();
- FileInputStream input = new FileInputStream(stateFile);
- byte[] restoredState = new byte[length];
- input.read(restoredState);
- restoredRuntime.restore(restoredState);
- restoredRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_5"});
- input.close();
- restoredRuntime.shutdown();
- }
-
- private ExecutionPlanRuntime setupRuntimeForLengthSlideWindowWithGroupby(){
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "define stream testStream (user string, cmd string);";
- String query = "@info(name = 'query1') from testStream#window.length(50) "
- + " select user, cmd, count(user) as cnt"
- + " insert all events into OutputStream";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
- executionPlanRuntime.addCallback("query1", new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- });
-
- executionPlanRuntime.start();
- return executionPlanRuntime;
- }
-
- @Ignore
- public void testLengthSlideWindowWithGroupby() throws Exception{
- String tmpdir = System.getProperty("java.io.tmpdir");
- System.out.println("temporary directory: " + tmpdir);
-
- String stateFile = tmpdir + "/siddhi-state-lengthslidewindowwithgroupby";
- ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForLengthSlideWindowWithGroupby();
- executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_1"});
- executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_2"});
- executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_3"});
- executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_4"});
- byte[] state = executionPlanRuntime.snapshot();
- int length = state.length;
- FileOutputStream output = new FileOutputStream(stateFile);
- output.write(state);
- output.close();
- executionPlanRuntime.shutdown();
-
- ExecutionPlanRuntime restoredRuntime = setupRuntimeForLengthSlideWindowWithGroupby();
- FileInputStream input = new FileInputStream(stateFile);
- byte[] restoredState = new byte[length];
- input.read(restoredState);
- restoredRuntime.restore(restoredState);
- restoredRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_5"});
- input.close();
- restoredRuntime.shutdown();
- }
-
- private ExecutionPlanRuntime setupRuntimeForTimeSlideWindow(){
- SiddhiManager siddhiManager = new SiddhiManager();
- String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
- String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
- + " select user, timeStamp " +
- "insert all events into outputStream;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
- executionPlanRuntime.addCallback("query1", new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- });
- executionPlanRuntime.start();
- return executionPlanRuntime;
- }
-
- @Test
- public void testTimeSlideWindow() throws Exception{
- String tmpdir = System.getProperty("java.io.tmpdir");
- System.out.println("temporary directory: " + tmpdir);
-
- String stateFile = tmpdir + "/siddhi-state-timeslidewindow";
- ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForTimeSlideWindow();
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
- long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
- inputHandler.send(new Object[]{curTime, "user", "open"});
- inputHandler.send(new Object[]{curTime + 1000, "user", "open"});
- inputHandler.send(new Object[]{curTime + 2000, "user", "open"});
- inputHandler.send(new Object[]{curTime + 3000, "user", "open"});
-
- byte[] state = executionPlanRuntime.snapshot();
- int length = state.length;
- FileOutputStream output = new FileOutputStream(stateFile);
- output.write(state);
- output.close();
- executionPlanRuntime.shutdown();
-
- ExecutionPlanRuntime restoredRuntime = setupRuntimeForTimeSlideWindow();
- FileInputStream input = new FileInputStream(stateFile);
- byte[] restoredState = new byte[length];
- input.read(restoredState);
- restoredRuntime.restore(restoredState);
- inputHandler = restoredRuntime.getInputHandler("testStream");
- inputHandler.send(new Object[]{curTime + 4000, "user", "open"});
- inputHandler.send(new Object[]{curTime + 5000, "user", "open"});
- inputHandler.send(new Object[]{curTime + 6000, "user", "open"});
- inputHandler.send(new Object[]{curTime + 7000, "user", "open"});
- Thread.sleep(1000);
- input.close();
- restoredRuntime.shutdown();
- }
-
- private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby(){
- SiddhiManager siddhiManager = new SiddhiManager();
- String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
- String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,30 sec)"
- + " select user, timeStamp, count(user) as cnt"
- + " group by user"
- + " having cnt > 2"
- + " insert all events into outputStream;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
- executionPlanRuntime.addCallback("query1", new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- });
- executionPlanRuntime.start();
- return executionPlanRuntime;
- }
-
- @Test
- public void testExternalTimeSlideWindowWithGroupby() throws Exception{
- String tmpdir = System.getProperty("java.io.tmpdir");
- System.out.println("temporary directory: " + tmpdir);
-
- String stateFile = tmpdir + "/siddhi-state-externaltimeslidewindow";
- ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby();
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
- long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
- inputHandler.send(new Object[]{curTime, "user", "open"});
- inputHandler.send(new Object[]{curTime + 1000, "user", "open"});
- inputHandler.send(new Object[]{curTime + 2000, "user", "open"});
- inputHandler.send(new Object[]{curTime + 3000, "user", "open"});
- Thread.sleep(1000);
-
- byte[] state = executionPlanRuntime.snapshot();
- int length = state.length;
- FileOutputStream output = new FileOutputStream(stateFile);
- output.write(state);
- output.close();
- executionPlanRuntime.shutdown();
-
- ExecutionPlanRuntime restoredRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby();
- FileInputStream input = new FileInputStream(stateFile);
- byte[] restoredState = new byte[length];
- input.read(restoredState);
- restoredRuntime.restore(restoredState);
- inputHandler = restoredRuntime.getInputHandler("testStream");
- inputHandler.send(new Object[]{curTime + 4000, "user", "open"});
- inputHandler.send(new Object[]{curTime + 5000, "user", "open"});
- inputHandler.send(new Object[]{curTime + 6000, "user", "open"});
- inputHandler.send(new Object[]{curTime + 7000, "user", "open"});
- Thread.sleep(1000);
- input.close();
- restoredRuntime.shutdown();
- }
-
- private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby_2(SiddhiManager siddhiManager){
- String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
- String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,300 sec)"
- + " select user, timeStamp, count(user) as cnt"
- + " group by user"
- + " insert all events into outputStream;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
- executionPlanRuntime.addCallback("query1", new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- });
- return executionPlanRuntime;
- }
-
- @Test
- public void testExternalTimeSlideWindowWithGroupby_2() throws Exception{
- SiddhiManager siddhiManager = new SiddhiManager();
- PersistenceStore persistenceStore = new InMemoryPersistenceStore();
- siddhiManager.setPersistenceStore(persistenceStore);
-
- ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager);
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
- executionPlanRuntime.start();
- long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
- inputHandler.send(new Object[]{curTime, "user", "open"});
- inputHandler.send(new Object[]{curTime + 1000, "user", "open"});
- inputHandler.send(new Object[]{curTime + 2000, "user", "open"});
- inputHandler.send(new Object[]{curTime + 3000, "user", "open"});
- Thread.sleep(100);
- executionPlanRuntime.persist();
- executionPlanRuntime.shutdown();
- ExecutionPlanRuntime restoredRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager);
- inputHandler = restoredRuntime.getInputHandler("testStream");
- restoredRuntime.start();
- restoredRuntime.restoreLastRevision();
- inputHandler.send(new Object[]{curTime + 4000, "user", "open"});
- inputHandler.send(new Object[]{curTime + 5000, "user", "open"});
- inputHandler.send(new Object[]{curTime + 6000, "user", "open"});
- inputHandler.send(new Object[]{curTime + 7000, "user", "open"});
- Thread.sleep(1000);
- restoredRuntime.shutdown();
- }
-
- private ExecutionPlanRuntime setupRuntimeForInternalTimeSlideWindowWithGroupby(){
- SiddhiManager siddhiManager = new SiddhiManager();
- String cseEventStream = "define stream testStream (user string, cmd string);";
- String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.time(5 sec)"
- + " select user, count(user) as cnt"
- + " group by user"
- + " having cnt > 2"
- + " insert events into outputStream;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
- executionPlanRuntime.addCallback("query1", new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- });
- executionPlanRuntime.start();
- return executionPlanRuntime;
- }
-
- @Test
- public void testInternalTimeSlideWindowWithGroupby() throws Exception{
- String tmpdir = System.getProperty("java.io.tmpdir");
- System.out.println("temporary directory: " + tmpdir);
-
- String stateFile = tmpdir + "/siddhi-state-internaltimeslidewindow";
- ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForInternalTimeSlideWindowWithGroupby();
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
- inputHandler.send(new Object[]{"user", "open"});
- Thread.sleep(1000);
- inputHandler.send(new Object[]{"user", "open"});
- Thread.sleep(1000);
- inputHandler.send(new Object[]{"user", "open"});
- Thread.sleep(1000);
- inputHandler.send(new Object[]{"user", "open"});
-
- byte[] state = executionPlanRuntime.snapshot();
- int length = state.length;
- FileOutputStream output = new FileOutputStream(stateFile);
- output.write(state);
- output.close();
- executionPlanRuntime.shutdown();
-
- ExecutionPlanRuntime restoredRuntime = setupRuntimeForInternalTimeSlideWindowWithGroupby();
- FileInputStream input = new FileInputStream(stateFile);
- byte[] restoredState = new byte[length];
- input.read(restoredState);
- restoredRuntime.restore(restoredState);
- inputHandler = restoredRuntime.getInputHandler("testStream");
- inputHandler.send(new Object[]{"user", "open"});
- Thread.sleep(1000);
- inputHandler.send(new Object[]{"user", "open"});
- Thread.sleep(1000);
- inputHandler.send(new Object[]{"user", "open"});
- Thread.sleep(1000);
- inputHandler.send(new Object[]{"user", "open"});
- Thread.sleep(1000);
- input.close();
- restoredRuntime.shutdown();
- }
-
- private int count;
- private boolean eventArrived;
-
- @Before
- public void init() {
- count = 0;
- eventArrived = false;
- }
-
- /**
- * Siddhi does not support external time window based snapshot
- * @throws InterruptedException
- */
- public void persistenceTest7() throws InterruptedException {
- PersistenceStore persistenceStore = new InMemoryPersistenceStore();
-
- SiddhiManager siddhiManager = new SiddhiManager();
- siddhiManager.setPersistenceStore(persistenceStore);
-
- String executionPlan = "" +
- "@plan:name('Test') " +
- "" +
- "define stream StockStream (symbol string, price float, volume int, timestamp long);" +
- "" +
- "@info(name = 'query1')" +
- "from StockStream#window.externalTime(timestamp,30 sec) " +
- "select symbol, price, sum(volume) as totalVol, count(symbol) as cnt " +
- "group by symbol " +
- "insert into OutStream ";
-
- QueryCallback queryCallback = new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- eventArrived = true;
- for (Event inEvent : inEvents) {
- count++;
- Assert.assertTrue("IBM".equals(inEvent.getData(0)) || "WSO2".equals(inEvent.getData(0)));
- if (count == 5) {
- Assert.assertEquals(400l, inEvent.getData(2));
- }
- if (count == 6) {
- Assert.assertEquals(200l, inEvent.getData(2));
- }
- }
- }
- };
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
- executionPlanRuntime.addCallback("query1", queryCallback);
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("StockStream");
- executionPlanRuntime.start();
- long currentTime = 0;
-
- inputHandler.send(new Object[]{"IBM", 75.1f, 100, currentTime + 1000});
- Thread.sleep(100);
- inputHandler.send(new Object[]{"WSO2", 75.2f, 100, currentTime + 2000});
- Thread.sleep(100);
- inputHandler.send(new Object[]{"IBM", 75.3f, 100, currentTime + 3000});
-
- Thread.sleep(500);
- Assert.assertTrue(eventArrived);
- Assert.assertEquals(3, count);
-
- //persisting
- Thread.sleep(500);
- executionPlanRuntime.persist();
-
- //restarting execution plan
- Thread.sleep(500);
- executionPlanRuntime.shutdown();
- executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
- executionPlanRuntime.addCallback("query1", queryCallback);
- inputHandler = executionPlanRuntime.getInputHandler("StockStream");
- executionPlanRuntime.start();
-
- //loading
- executionPlanRuntime.restoreLastRevision();
-
- inputHandler.send(new Object[]{"IBM", 75.4f, 100, currentTime + 4000});
- Thread.sleep(100);
- inputHandler.send(new Object[]{"IBM", 75.5f, 100, currentTime + 5000});
- Thread.sleep(100);
- inputHandler.send(new Object[]{"WSO2", 75.6f, 100, currentTime + 6000});
-
- //shutdown execution plan
- Thread.sleep(500);
- executionPlanRuntime.shutdown();
-
- Assert.assertEquals(count, 6);
- Assert.assertEquals(true, eventArrived);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java
index 68995d1..bcffec1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java
@@ -49,7 +49,7 @@ public class AttributeResolveResource {
response.setObj(result);
} catch (Exception e) {
response.setSuccess(false);
- response.setException(EagleExceptionWrapper.wrap(e));
+ response.setException(e);
return response;
}
return response;
@@ -73,9 +73,9 @@ public class AttributeResolveResource {
response.setObj(result);
} catch (Exception e) {
response.setSuccess(false);
- response.setException(EagleExceptionWrapper.wrap(e));
+ response.setException(e);
return response;
}
return response;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
index dca2e3d..5b498eb 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
@@ -1,69 +1,74 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.app.environment.Environment;
-import org.apache.eagle.app.environment.ExecutionRuntimeManager;
-import org.apache.eagle.app.utils.ApplicationConfigHelper;
-
-import java.lang.reflect.ParameterizedType;
-import java.util.Map;
-
-abstract class AbstractApplication<Conf extends Configuration,Env extends Environment,Proc> implements Application<Conf,Env,Proc>, ApplicationTool<Conf> {
- private Class<Conf> parametrizedConfigClass;
-
- @Override
- public Proc execute(Map<String, Object> config, Env env) {
- return execute(ApplicationConfigHelper.convertFrom(config, getConfigType()),env);
- }
-
- /**
- * Map application configuration from environment
- *
- * @param config
- * @return
- */
- private Conf loadAppConfigFromEnv(Config config){
- return ApplicationConfigHelper.convertFrom(ApplicationConfigHelper.unwrapFrom(config,getClass().getCanonicalName()), getConfigType());
- }
-
- @Override
- public void run(Config config) {
- ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(),config).start(this,loadAppConfigFromEnv(config));
- }
-
- @Override
- public void run(Configuration conf, Config config) {
- ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(), config).start(this,conf);
- }
-
- @Override
- public Proc execute(Env environment) {
- return execute(loadAppConfigFromEnv(environment.config()),environment);
- }
-
- /**
- * @return Config class from Generic Type
- */
- public Class<Conf> getConfigType(){
- if (parametrizedConfigClass == null) {
- this.parametrizedConfigClass = (Class<Conf>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
- }
- return parametrizedConfigClass;
- }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.Environment;
+import org.apache.eagle.app.environment.ExecutionRuntimeManager;
+import org.apache.eagle.app.utils.ApplicationConfigHelper;
+
+import java.lang.reflect.ParameterizedType;
+import java.util.Map;
+
+abstract class AbstractApplication<Conf extends Configuration,Env extends Environment,Proc> implements Application<Conf,Env,Proc>, ApplicationTool<Conf> {
+ private Class<Conf> parametrizedConfigClass;
+
+ @Override
+ public Proc execute(Map<String, Object> config, Env env) {
+ return execute(ApplicationConfigHelper.convertFrom(config, getConfigType()),env);
+ }
+
+ @Override
+ public Proc execute(Config config, Env environment){
+ return null;
+ }
+ /**
+ * Map application configuration from environment
+ *
+ * @param config
+ * @return
+ */
+ private Conf loadAppConfigFromEnv(Config config){
+ return ApplicationConfigHelper.convertFrom(ApplicationConfigHelper.unwrapFrom(config,getClass().getCanonicalName()), getConfigType());
+ }
+
+ @Override
+ public void run(Config config) {
+// ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(),config).start(this,loadAppConfigFromEnv(config));
+ ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(),config).start(this,config);
+ }
+
+ @Override
+ public void run(Configuration conf, Config config) {
+// ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(), config).start(this,conf);
+ }
+
+ @Override
+ public Proc execute(Env environment) {
+ return execute(loadAppConfigFromEnv(environment.config()),environment);
+ }
+
+ /**
+ * @return Config class from Generic Type
+ */
+ public Class<Conf> getConfigType(){
+ if (parametrizedConfigClass == null) {
+ this.parametrizedConfigClass = (Class<Conf>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+ }
+ return parametrizedConfigClass;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
index 699b13e..d1e4c9b 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
@@ -16,6 +16,7 @@
*/
package org.apache.eagle.app;
+import com.typesafe.config.Config;
import org.apache.eagle.app.environment.Environment;
import java.io.Serializable;
@@ -63,6 +64,14 @@ public interface Application <
Proc execute(Map<String,Object> config, Env environment);
/**
+ * Execute with type-safe configuration
+ * @param config
+ * @param environment
+ * @return
+ */
+ Proc execute(Config config, Env environment);
+
+ /**
* Execute with environment based configuration
*
* Light-weight Runner (dry-run/test purpose) oriented interface
@@ -81,4 +90,4 @@ public interface Application <
* @return application environment type
*/
Class<? extends Env> getEnvironmentType();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java
index 7605d92..c4e89f4 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java
@@ -1,53 +1,54 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment;
-
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.Configuration;
-
-/**
- * Execution Runtime Adapter
- */
-public interface ExecutionRuntime<Env extends Environment, Proc> {
- /**
- * @param environment
- */
- void prepare(Env environment);
-
- Env environment();
-
- /**
- * @param executor
- * @param config
- * @param <Conf>
- */
- <Conf extends Configuration> void start(Application<Conf,Env, Proc> executor, Conf config);
-
- /**
- * @param executor
- * @param config
- * @param <Conf>
- */
- <Conf extends Configuration> void stop(Application<Conf,Env, Proc> executor, Conf config);
-
- /**
- * @param executor
- * @param config
- * @param <Conf>
- */
- <Conf extends Configuration> void status(Application<Conf,Env, Proc> executor, Conf config);
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Application;
+import org.apache.eagle.app.Configuration;
+
+/**
+ * Execution Runtime Adapter
+ */
+public interface ExecutionRuntime<Env extends Environment, Proc> {
+ /**
+ * @param environment
+ */
+ void prepare(Env environment);
+
+ Env environment();
+
+ /**
+ * @param executor
+ * @param config
+ * @param <Conf>
+ */
+ <Conf extends Configuration> void start(Application<Conf,Env, Proc> executor, Config config);
+
+ /**
+ * @param executor
+ * @param config
+ * @param <Conf>
+ */
+ <Conf extends Configuration> void stop(Application<Conf,Env, Proc> executor, Config config);
+
+ /**
+ * @param executor
+ * @param config
+ * @param <Conf>
+ */
+ <Conf extends Configuration> void status(Application<Conf,Env, Proc> executor, Config config);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
index 5bcde92..5d4e049 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
@@ -1,58 +1,59 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.app.environment.ExecutionRuntime;
-import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
-
-public class SparkExecutionRuntime implements ExecutionRuntime<SparkEnvironment,Object> {
- @Override
- public void prepare(SparkEnvironment environment) {
- throw new RuntimeException("Not implemented yet");
- }
-
- @Override
- public SparkEnvironment environment() {
- throw new RuntimeException("Not implemented yet");
- }
-
- @Override
- public void start(Application executor, Configuration config) {
-
- throw new RuntimeException("Not implemented yet");
- }
-
- @Override
- public void stop(Application executor, Configuration config) {
-
- throw new RuntimeException("Not implemented yet");
- }
-
- @Override
- public void status(Application executor, Configuration config) {
- throw new RuntimeException("Not implemented yet");
- }
-
- public static class Provider implements ExecutionRuntimeProvider<SparkEnvironment,Object> {
- @Override
- public SparkExecutionRuntime get() {
- return new SparkExecutionRuntime();
- }
- }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.impl;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Application;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.environment.ExecutionRuntime;
+import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
+
+public class SparkExecutionRuntime implements ExecutionRuntime<SparkEnvironment,Object> {
+ @Override
+ public void prepare(SparkEnvironment environment) {
+ throw new RuntimeException("Not implemented yet");
+ }
+
+ @Override
+ public SparkEnvironment environment() {
+ throw new RuntimeException("Not implemented yet");
+ }
+
+ @Override
+ public void start(Application executor, Config config) {
+
+ throw new RuntimeException("Not implemented yet");
+ }
+
+ @Override
+ public void stop(Application executor, Config config) {
+
+ throw new RuntimeException("Not implemented yet");
+ }
+
+ @Override
+ public void status(Application executor, Config config) {
+ throw new RuntimeException("Not implemented yet");
+ }
+
+ public static class Provider implements ExecutionRuntimeProvider<SparkEnvironment,Object> {
+ @Override
+ public SparkExecutionRuntime get() {
+ return new SparkExecutionRuntime();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
index 4b4a0be..1112588 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
@@ -1,42 +1,42 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.app.environment.AbstractEnvironment;
-import org.apache.eagle.app.sink.FlattenEventMapper;
-import org.apache.eagle.app.sink.LoggingStreamSink;
-import org.apache.eagle.app.sink.StormStreamSink;
-import org.apache.eagle.app.sink.StreamSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Int;
-import storm.trident.spout.RichSpoutBatchExecutor;
-
-/**
- * Storm Execution Environment Context
- */
-public class StormEnvironment extends AbstractEnvironment {
- public StormEnvironment(Config envConfig) {
- super(envConfig);
- }
-
- public StormStreamSink getFlattenStreamSink(String streamId,Configuration appConfig) {
- return ((StormStreamSink) streamSink().getSink(streamId,appConfig)).setEventMapper(new FlattenEventMapper(streamId));
- }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.impl;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.environment.AbstractEnvironment;
+import org.apache.eagle.app.sink.FlattenEventMapper;
+import org.apache.eagle.app.sink.LoggingStreamSink;
+import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.sink.StreamSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Int;
+import storm.trident.spout.RichSpoutBatchExecutor;
+
+/**
+ * Storm Execution Environment Context
+ */
+public class StormEnvironment extends AbstractEnvironment {
+ public StormEnvironment(Config envConfig) {
+ super(envConfig);
+ }
+
+ public StormStreamSink getFlattenStreamSink(String streamId, Config config) {
+ return ((StormStreamSink) streamSink().getSink(streamId,config)).setEventMapper(new FlattenEventMapper(streamId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 23e7334..06c22dc 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -1,162 +1,165 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.*;
-import backtype.storm.utils.NimbusClient;
-import com.google.common.base.Preconditions;
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.app.environment.ExecutionRuntime;
-import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
-import org.apache.eagle.app.utils.DynamicJarPathFinder;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.thrift7.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Int;
-import storm.trident.spout.RichSpoutBatchExecutor;
-
-public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> {
- private final static Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
- private static LocalCluster _localCluster;
-
- private StormEnvironment environment;
-
-// static {
-// Runtime.getRuntime().addShutdownHook(new Thread(){
-// @Override
-// public void run() {
-// if(_localCluster != null) {
-// LOG.info("Shutting down local storm cluster instance");
-// _localCluster.shutdown();
-// }
-// }
-// });
-// }
-
- private static LocalCluster getLocalCluster(){
- if(_localCluster == null){
- _localCluster = new LocalCluster();
- }
- return _localCluster;
- }
-
- @Override
- public void prepare(StormEnvironment environment) {
- this.environment = environment;
- }
-
- @Override
- public StormEnvironment environment() {
- return this.environment;
- }
-
- private final static String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
- private final static String STORM_NIMBUS_HOST_DEFAULT = "localhost";
- private final static Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
- private final static String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
-
- public backtype.storm.Config getStormConfig(){
- backtype.storm.Config conf = new backtype.storm.Config();
- conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
- conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
- conf.put(backtype.storm.Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32));
- conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384));
- conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
- conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
- String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
- if(environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
- nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
- LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
- } else {
- LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
- }
- Integer nimbusThriftPort = STORM_NIMBUS_THRIFT_DEFAULT;
- if(environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
- nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
- LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
- } else {
- LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT);
- }
- conf.put(backtype.storm.Config.NIMBUS_HOST, nimbusHost);
- conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort);
- return conf;
- }
-
- @Override
- public <Conf extends Configuration> void start(Application<Conf, StormEnvironment, StormTopology> executor, Conf config){
- String topologyName = config.getAppId();
- Preconditions.checkNotNull(topologyName,"[appId] is required by null for "+executor.getClass().getCanonicalName());
- StormTopology topology = executor.execute(config, environment);
- LOG.info("Starting {} ({})",topologyName,executor.getClass().getCanonicalName());
- Config conf = getStormConfig();
- if(config.getMode() == ApplicationEntity.Mode.CLUSTER){
- if(config.getJarPath() == null) config.setJarPath(DynamicJarPathFinder.findPath(executor.getClass()));
- String jarFile = config.getJarPath();
- synchronized (StormExecutionRuntime.class) {
- System.setProperty("storm.jar", jarFile);
- LOG.info("Submitting as cluster mode ...");
- try {
- StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
- } catch (AlreadyAliveException | InvalidTopologyException e) {
- LOG.error(e.getMessage(), e);
- throw new RuntimeException(e.getMessage(),e);
- } finally {
- System.clearProperty("storm.jar");
- }
- }
- } else {
- LOG.info("Submitting as local mode ...");
- getLocalCluster().submitTopology(topologyName, conf, topology);
- LOG.info("Submitted");
- }
- }
-
- @Override
- public <Conf extends Configuration> void stop(Application<Conf,StormEnvironment, StormTopology> executor, Conf config) {
- String appId = config.getAppId();
- if(config.getMode() == ApplicationEntity.Mode.CLUSTER){
- Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig()).getClient();
- try {
- stormClient.killTopology(appId);
- } catch (NotAliveException | TException e) {
- LOG.error("Failed to kill topology named {}, due to: {}",appId,e.getMessage(),e.getCause());
- }
- } else {
- KillOptions killOptions = new KillOptions();
- killOptions.set_wait_secs(0);
- getLocalCluster().killTopologyWithOpts(appId,killOptions);
- }
- }
-
- @Override
- public <Conf extends Configuration> void status(Application<Conf,StormEnvironment, StormTopology> executor, Conf config) {
- // TODO: Not implemented yet!
- throw new RuntimeException("TODO: Not implemented yet!");
- }
-
- public static class Provider implements ExecutionRuntimeProvider<StormEnvironment,StormTopology> {
- @Override
- public StormExecutionRuntime get() {
- return new StormExecutionRuntime();
- }
- }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.impl;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.*;
+import backtype.storm.utils.NimbusClient;
+import com.google.common.base.Preconditions;
+import org.apache.eagle.app.Application;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.environment.ExecutionRuntime;
+import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
+import org.apache.eagle.app.utils.DynamicJarPathFinder;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.thrift7.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Int;
+import storm.trident.spout.RichSpoutBatchExecutor;
+
+public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> {
+ private final static Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
+ private static LocalCluster _localCluster;
+
+ private StormEnvironment environment;
+
+// static {
+// Runtime.getRuntime().addShutdownHook(new Thread(){
+// @Override
+// public void run() {
+// if(_localCluster != null) {
+// LOG.info("Shutting down local storm cluster instance");
+// _localCluster.shutdown();
+// }
+// }
+// });
+// }
+
+ private static LocalCluster getLocalCluster(){
+ if(_localCluster == null){
+ _localCluster = new LocalCluster();
+ }
+ return _localCluster;
+ }
+
+ @Override
+ public void prepare(StormEnvironment environment) {
+ this.environment = environment;
+ }
+
+ @Override
+ public StormEnvironment environment() {
+ return this.environment;
+ }
+
+ private final static String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
+ private final static String STORM_NIMBUS_HOST_DEFAULT = "localhost";
+ private final static Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
+ private final static String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
+
+ public backtype.storm.Config getStormConfig(){
+ backtype.storm.Config conf = new backtype.storm.Config();
+ conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
+ conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
+ conf.put(backtype.storm.Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32));
+ conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384));
+ conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
+ conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
+ String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
+ if(environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
+ nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
+ LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
+ } else {
+ LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
+ }
+ Integer nimbusThriftPort = STORM_NIMBUS_THRIFT_DEFAULT;
+ if(environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
+ nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
+ LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
+ } else {
+ LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT);
+ }
+ conf.put(backtype.storm.Config.NIMBUS_HOST, nimbusHost);
+ conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort);
+ return conf;
+ }
+
+ @Override
+ public <Conf extends Configuration> void start(Application<Conf, StormEnvironment, StormTopology> executor, com.typesafe.config.Config config){
+ String topologyName = config.getString("appId");
+ Preconditions.checkNotNull(topologyName,"[appId] is required by null for "+executor.getClass().getCanonicalName());
+ StormTopology topology = executor.execute(config, environment);
+ LOG.info("Starting {} ({})",topologyName,executor.getClass().getCanonicalName());
+ Config conf = getStormConfig();
+ if(config.getString("mode") == ApplicationEntity.Mode.CLUSTER.name()){
+// if(config.getString("jarPath") == null) config.setJarPath(DynamicJarPathFinder.findPath(executor.getClass()));
+ String jarFile = config.getString("jarPath");
+ if(jarFile == null){
+ jarFile = DynamicJarPathFinder.findPath(executor.getClass());
+ }
+ synchronized (StormExecutionRuntime.class) {
+ System.setProperty("storm.jar", jarFile);
+ LOG.info("Submitting as cluster mode ...");
+ try {
+ StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
+ } catch (AlreadyAliveException | InvalidTopologyException e) {
+ LOG.error(e.getMessage(), e);
+ throw new RuntimeException(e.getMessage(),e);
+ } finally {
+ System.clearProperty("storm.jar");
+ }
+ }
+ } else {
+ LOG.info("Submitting as local mode ...");
+ getLocalCluster().submitTopology(topologyName, conf, topology);
+ LOG.info("Submitted");
+ }
+ }
+
+ @Override
+ public <Conf extends Configuration> void stop(Application<Conf,StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
+ String appId = config.getString("appId");
+ if(config.getString("mode") == ApplicationEntity.Mode.CLUSTER.name()){
+ Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig()).getClient();
+ try {
+ stormClient.killTopology(appId);
+ } catch (NotAliveException | TException e) {
+ LOG.error("Failed to kill topology named {}, due to: {}",appId,e.getMessage(),e.getCause());
+ }
+ } else {
+ KillOptions killOptions = new KillOptions();
+ killOptions.set_wait_secs(0);
+ getLocalCluster().killTopologyWithOpts(appId,killOptions);
+ }
+ }
+
+ @Override
+ public <Conf extends Configuration> void status(Application<Conf,StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
+ // TODO: Not implemented yet!
+ throw new RuntimeException("TODO: Not implemented yet!");
+ }
+
+ public static class Provider implements ExecutionRuntimeProvider<StormEnvironment,StormTopology> {
+ @Override
+ public StormExecutionRuntime get() {
+ return new StormExecutionRuntime();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
index 7a0de82..90137e5 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
@@ -18,6 +18,7 @@ package org.apache.eagle.app.service;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.eagle.app.Application;
import org.apache.eagle.app.ApplicationLifecycle;
import org.apache.eagle.app.Configuration;
@@ -45,7 +46,7 @@ import java.util.stream.Collectors;
* </ul>
*/
public class ApplicationContext implements Serializable, ApplicationLifecycle {
- private final Configuration config;
+ private final Config config;
private final Application application;
private final ExecutionRuntime runtime;
private final ApplicationEntity metadata;
@@ -54,19 +55,17 @@ public class ApplicationContext implements Serializable, ApplicationLifecycle {
* @param metadata ApplicationEntity
* @param application Application
*/
- public ApplicationContext(Application application, ApplicationEntity metadata, Config config){
+ public ApplicationContext(Application application, ApplicationEntity metadata, Config config1){
Preconditions.checkNotNull(application,"Application is null");
Preconditions.checkNotNull(metadata,"ApplicationEntity is null");
this.application = application;
this.metadata = metadata;
- this.runtime = ExecutionRuntimeManager.getInstance().getRuntime(application.getEnvironmentType(),config);
+ this.runtime = ExecutionRuntimeManager.getInstance().getRuntime(application.getEnvironmentType(),config1);
Map<String,Object> applicationConfig = metadata.getConfiguration();
if(applicationConfig == null) {
applicationConfig = Collections.emptyMap();
}
- this.config = ApplicationConfigHelper.convertFrom(applicationConfig,application.getConfigType());
- this.config.setMode(metadata.getMode());
- this.config.setAppId(metadata.getAppId());
+ this.config = ConfigFactory.parseMap(applicationConfig);
}
@Override
@@ -100,4 +99,4 @@ public class ApplicationContext implements Serializable, ApplicationLifecycle {
public ApplicationEntity getMetadata() {
return metadata;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
index 88c9f4d..c98e7cc 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
@@ -20,19 +20,26 @@ import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.eagle.app.service.ApplicationContext;
import org.apache.eagle.app.service.ApplicationOperations;
import org.apache.eagle.app.service.ApplicationManagementService;
import org.apache.eagle.app.service.ApplicationProviderService;
+import org.apache.eagle.app.spi.ApplicationProvider;
import org.apache.eagle.metadata.exceptions.EntityNotFoundException;
import org.apache.eagle.metadata.model.ApplicationDesc;
import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.Property;
import org.apache.eagle.metadata.model.SiteEntity;
import org.apache.eagle.metadata.service.ApplicationEntityService;
import org.apache.eagle.metadata.service.SiteEntityService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
@Singleton
public class ApplicationManagementServiceImpl implements ApplicationManagementService {
private final SiteEntityService siteEntityService;
@@ -53,6 +60,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
this.applicationEntityService = applicationEntityService;
}
+ @Override
public ApplicationEntity install(ApplicationOperations.InstallOperation operation) throws EntityNotFoundException {
Preconditions.checkNotNull(operation.getSiteId(),"siteId is null");
Preconditions.checkNotNull(operation.getAppType(),"appType is null");
@@ -63,16 +71,38 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
ApplicationEntity applicationEntity = new ApplicationEntity();
applicationEntity.setDescriptor(appDesc);
applicationEntity.setSite(siteEntity);
- applicationEntity.setConfiguration(operation.getConfiguration());
+
+ /**
+ * calculate application config based on
+ * 1) default values in metadata.xml
+ * 2) user's config value
+ * 3) some metadata, for example siteId, mode, appId
+ */
+ Map<String, Object> appConfig = new HashMap<>();
+ ApplicationProvider provider = applicationProviderService.getApplicationProviderByType(operation.getAppType());
+ List<Property> propertyList = provider.getApplicationDesc().getConfiguration().getProperties();
+ for(Property p : propertyList){
+ appConfig.put(p.getName(), p.getValue());
+ }
+ if(operation.getConfiguration() != null) {
+ appConfig.putAll(operation.getConfiguration());
+ }
+ appConfig.put("siteId", operation.getSiteId());
+ appConfig.put("mode", operation.getMode().name());
+ appConfig.put("appId", operation.getAppType());
+
+ applicationEntity.setConfiguration(appConfig);
applicationEntity.setMode(operation.getMode());
ApplicationContext applicationContext = new ApplicationContext(
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
applicationEntity,config);
applicationContext.onInstall();
applicationEntityService.create(applicationEntity);
+
return applicationEntity;
}
+ @Override
public ApplicationEntity uninstall(ApplicationOperations.UninstallOperation operation) {
ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
ApplicationContext applicationContext = new ApplicationContext(
@@ -88,6 +118,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
return applicationEntityService.delete(applicationEntity);
}
+ @Override
public ApplicationEntity start(ApplicationOperations.StartOperation operation) {
ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
ApplicationContext applicationContext = new ApplicationContext(
@@ -97,6 +128,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
return applicationEntity;
}
+ @Override
public ApplicationEntity stop(ApplicationOperations.StopOperation operation) {
ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
ApplicationContext applicationContext = new ApplicationContext(
@@ -105,4 +137,4 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
applicationContext.onStop();
return applicationEntity;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
index dda58d6..e4adfd2 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
@@ -17,32 +17,60 @@
package org.apache.eagle.app.sink;
import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.tuple.Tuple;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.app.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
+import java.util.Properties;
public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
- private final static Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSink.class);
+ private final static Logger LOG = LoggerFactory.getLogger(KafkaStreamSink.class);
private String topicId;
+ private Producer producer;
+ private KafkaStreamSinkConfig config;
@Override
public void init(String streamId, KafkaStreamSinkConfig config) {
super.init(streamId, config);
this.topicId = config.getTopicId();
+ this.config = config;
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
- // TODO: Create KafkaProducer
+ Properties properties = new Properties();
+ properties.put("metadata.broker.list", config.getBrokerList());
+ properties.put("serializer.class", config.getSerializerClass());
+ properties.put("key.serializer.class", config.getKeySerializerClass());
+ ProducerConfig producerConfig = new ProducerConfig(properties);
+ producer = new Producer(producerConfig);
}
@Override
protected void onEvent(StreamEvent streamEvent) {
- LOGGER.info("TODO: producing {} to '{}'",streamEvent,topicId);
+ }
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ LOG.info("TODO: producing {} to '{}'", input, topicId);
+
+ try {
+ Map m = (Map) input.getValue(1);
+ String output = new ObjectMapper().writeValueAsString(m);
+ producer.send(new KeyedMessage(this.topicId, m.get("user"), output));
+ }catch(Exception ex){
+ LOG.error("", ex);
+ collector.reportError(ex);
+ }
}
@Override
@@ -51,11 +79,11 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
}
private void ensureTopicCreated(){
- LOGGER.info("TODO: ensure kafka topic {} created",this.topicId);
+ LOG.info("TODO: ensure kafka topic {} created",this.topicId);
}
private void ensureTopicDeleted(){
- LOGGER.info("TODO: ensure kafka topic {} deleted",this.topicId);
+ LOG.info("TODO: ensure kafka topic {} deleted",this.topicId);
}
@Override
@@ -65,12 +93,12 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
public static class Provider implements StreamSinkProvider<KafkaStreamSink,KafkaStreamSinkConfig> {
@Override
- public KafkaStreamSinkConfig getSinkConfig(String streamId, Configuration appConfig) {
- String topicId = String.format("EAGLE.%s.%s",
- appConfig.getSiteId(),
- streamId).toLowerCase();
+ public KafkaStreamSinkConfig getSinkConfig(String streamId, Config config) {
KafkaStreamSinkConfig desc = new KafkaStreamSinkConfig();
- desc.setTopicId(topicId);
+ desc.setTopicId(config.getString("dataSinkConfig.topic"));
+ desc.setBrokerList(config.getString("dataSinkConfig.brokerList"));
+ desc.setSerializerClass(config.getString("dataSinkConfig.serializerClass"));
+ desc.setKeySerializerClass(config.getString("dataSinkConfig.keySerializerClass"));
return desc;
}
@@ -79,4 +107,4 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
return new KafkaStreamSink();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
index 17a3aa8..9d6a0ab 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
@@ -20,6 +20,9 @@ import org.apache.eagle.metadata.model.StreamSinkConfig;
public class KafkaStreamSinkConfig implements StreamSinkConfig {
private String topicId;
+ private String brokerList;
+ private String serializerClass;
+ private String keySerializerClass;
public String getTopicId() {
return topicId;
@@ -29,6 +32,30 @@ public class KafkaStreamSinkConfig implements StreamSinkConfig {
this.topicId = topicId;
}
+ public String getBrokerList() {
+ return brokerList;
+ }
+
+ public void setBrokerList(String brokerList) {
+ this.brokerList = brokerList;
+ }
+
+ public String getSerializerClass() {
+ return serializerClass;
+ }
+
+ public void setSerializerClass(String serializerClass) {
+ this.serializerClass = serializerClass;
+ }
+
+ public String getKeySerializerClass() {
+ return keySerializerClass;
+ }
+
+ public void setKeySerializerClass(String keySerializerClass) {
+ this.keySerializerClass = keySerializerClass;
+ }
+
@Override
public String getType() {
return "KAFKA";
@@ -43,4 +70,4 @@ public class KafkaStreamSinkConfig implements StreamSinkConfig {
public Class<? extends StreamSinkConfig> getConfigType() {
return KafkaStreamSinkConfig.class;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
index 3efd811..0d835d6 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
@@ -16,6 +16,7 @@
*/
package org.apache.eagle.app.sink;
+import com.typesafe.config.Config;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.model.StreamEvent;
import org.apache.eagle.app.Configuration;
@@ -43,7 +44,7 @@ public class LoggingStreamSink extends StormStreamSink<DefaultStreamSinkConfig>
public static class Provider implements StreamSinkProvider<LoggingStreamSink,DefaultStreamSinkConfig> {
@Override
- public DefaultStreamSinkConfig getSinkConfig(String streamId, Configuration appConfig) {
+ public DefaultStreamSinkConfig getSinkConfig(String streamId, Config config) {
return new DefaultStreamSinkConfig(LoggingStreamSink.class);
}
@@ -52,4 +53,4 @@ public class LoggingStreamSink extends StormStreamSink<DefaultStreamSinkConfig>
return new LoggingStreamSink();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
index 1b35a99..6765443 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
@@ -85,4 +85,4 @@ public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseBa
public String getStreamId() {
return streamId;
}
-}
\ No newline at end of file
+}