You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/14 23:52:53 UTC
[2/2] incubator-eagle git commit: clean up eagle-data-process Author:
@yonzhang2012 Closes: #343
clean up eagle-data-process
Author: @yonzhang2012 <yo...@gmail.com>
Closes: #343
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/b4732cb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/b4732cb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/b4732cb2
Branch: refs/heads/develop
Commit: b4732cb2fc104350e2f9e6c3edb8518f41a21a11
Parents: c66b525
Author: yonzhang <yo...@gmail.com>
Authored: Sun Aug 14 16:56:44 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Sun Aug 14 16:56:44 2016 -0700
----------------------------------------------------------------------
.../AbstractDynamicApplication.scala | 30 -----
.../src/main/java/META-INF/MANIFEST.MF | 19 ---
.../impl/storm/kafka/JsonSerializer.java | 58 ---------
.../storm/kafka/KafkaSourcedSpoutProvider.java | 104 ----------------
.../storm/kafka/KafkaSourcedSpoutScheme.java | 71 -----------
.../impl/storm/kafka/KafkaSpoutProvider.java | 118 +++++++++++++++++++
.../kafka/NewKafkaSourcedSpoutProvider.java | 118 -------------------
.../eagle/datastream/utils/JavaReflections.java | 31 -----
.../src/main/resources/application.conf | 1 -
.../eagle/datastream/utils/ReflectionS.scala | 55 ---------
.../entity/AbstractPolicyDefinitionEntity.java | 27 -----
.../alert/entity/AlertStreamSchemaEntity.java | 111 -----------------
.../eagle/policy/DefaultPolicyPartitioner.java | 32 -----
.../eagle/policy/PolicyEvaluationContext.java | 34 ------
.../apache/eagle/policy/PolicyEvaluator.java | 61 ----------
.../apache/eagle/policy/PolicyPartitioner.java | 26 ----
.../org/apache/eagle/policy/ResultRender.java | 32 -----
.../eagle/policy/executor/IPolicyExecutor.java | 29 -----
.../policy/siddhi/SiddhiEvaluationHandler.java | 27 -----
.../src/main/resources/log4j.properties | 21 ----
.../org/apache/eagle/gc/GCLogApplication.java | 4 +-
eagle-hadoop-metric/pom.xml | 24 ----
.../assembly/eagle-hadoop-metric-assembly.xml | 64 ----------
.../hadoop/metric/HadoopJmxApplication.java | 32 +++++
.../eagle/hadoop/metric/JsonParserBolt.java | 62 ++++++++++
.../org/apache/eagle/hadoop/metric/Utils.java | 64 ----------
.../src/main/resources/application.conf | 52 +++-----
.../kafka/EagleMetricCollectorApplication.java | 71 +----------
.../metric/kafka/KafkaSourcedSpoutProvider.java | 93 +++++++++++++++
.../metric/kafka/KafkaSourcedSpoutScheme.java | 72 +++++++++++
.../hbase/HBaseAuditLogApplication.java | 4 +-
.../AbstractHdfsAuditLogApplication.java | 4 +-
.../securitylog/HdfsAuthLogMonitoringMain.java | 4 +-
.../oozie/parse/OozieAuditLogApplication.java | 4 +-
34 files changed, 405 insertions(+), 1154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
deleted file mode 100644
index b5fbf59..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application
-
-import com.typesafe.config.Config
-import org.apache.eagle.datastream.core.StreamContext
-
-
-trait AbstractDynamicApplication extends TopologyExecutable {
- def compileStream(application: String, config: Config): StreamContext = {
- null
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-common/src/main/java/META-INF/MANIFEST.MF
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/META-INF/MANIFEST.MF b/eagle-core/eagle-common/src/main/java/META-INF/MANIFEST.MF
deleted file mode 100644
index c67816b..0000000
--- a/eagle-core/eagle-common/src/main/java/META-INF/MANIFEST.MF
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-Manifest-Version: 1.0
-Class-Path:
-
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java
deleted file mode 100644
index 416aaa3..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm.kafka;
-
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class JsonSerializer implements Serializer<Object> {
- private final StringSerializer stringSerializer = new StringSerializer();
- private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);
- private static final ObjectMapper om = new ObjectMapper();
-
- static {
- om.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS, true);
- }
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- stringSerializer.configure(configs,isKey);
- }
-
- @Override
- public byte[] serialize(String topic, Object data) {
- String str = null;
- try {
- str = om.writeValueAsString(data);
- } catch (IOException e) {
- logger.error("Kafka serialization for send error!", e);
- }
- return stringSerializer.serialize(topic, str);
- }
-
- @Override
- public void close() {
- stringSerializer.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
deleted file mode 100644
index c6a4983..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm.kafka;
-
-import java.util.Arrays;
-
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaSpout;
-import storm.kafka.SpoutConfig;
-import storm.kafka.ZkHosts;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.topology.base.BaseRichSpout;
-
-import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
-
-public class KafkaSourcedSpoutProvider implements StormSpoutProvider {
- private final static Logger LOG = LoggerFactory.getLogger(KafkaSourcedSpoutProvider.class);
-
- public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
- return new SchemeAsMultiScheme(new KafkaSourcedSpoutScheme(deserClsName, context));
- }
-
- private String configPrefix = "dataSourceConfig";
-
- public KafkaSourcedSpoutProvider(){}
-
- public KafkaSourcedSpoutProvider(String prefix){
- this.configPrefix = prefix;
- }
-
- @Override
- public BaseRichSpout getSpout(Config config){
- Config context = config;
- if(this.configPrefix!=null) context = config.getConfig(configPrefix);
- // Kafka topic
- String topic = context.getString("topic");
- // Kafka consumer group id
- String groupId = context.getString("consumerGroupId");
- // Kafka fetch size
- int fetchSize = context.getInt("fetchSize");
- // Kafka deserializer class
- String deserClsName = context.getString("deserializerClass");
- // Kafka broker zk connection
- String zkConnString = context.getString("zkConnection");
- // transaction zkRoot
- String zkRoot = context.getString("transactionZKRoot");
-
- LOG.info(String.format("Use topic id: %s",topic));
-
- String brokerZkPath = null;
- if(context.hasPath("brokerZkPath")) {
- brokerZkPath = context.getString("brokerZkPath");
- }
-
- BrokerHosts hosts;
- if(brokerZkPath == null) {
- hosts = new ZkHosts(zkConnString);
- } else {
- hosts = new ZkHosts(zkConnString, brokerZkPath);
- }
-
- SpoutConfig spoutConfig = new SpoutConfig(hosts,
- topic,
- zkRoot + "/" + topic,
- groupId);
-
- // transaction zkServers
- spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(","));
- // transaction zkPort
- spoutConfig.zkPort = context.getInt("transactionZKPort");
- // transaction update interval
- spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS");
- // Kafka fetch size
- spoutConfig.fetchSizeBytes = fetchSize;
- // "startOffsetTime" is for test usage, prod should not use this
- if (context.hasPath("startOffsetTime")) {
- spoutConfig.startOffsetTime = context.getInt("startOffsetTime");
- }
- // "forceFromStart" is for test usage, prod should not use this
- if (context.hasPath("forceFromStart")) {
- spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
- }
-
- spoutConfig.scheme = getStreamScheme(deserClsName, context);
- return new KafkaSpout(spoutConfig);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
deleted file mode 100644
index 15401fd..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm.kafka;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
-import org.apache.eagle.datastream.utils.NameConstants;
-
-import java.lang.reflect.Constructor;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * This scheme defines how a kafka message is deserialized and the output field name for storm stream
- * it includes the following:
- * 1. data source is kafka, so need kafka message deserializer class
- * 2. output field declaration
- */
-public class KafkaSourcedSpoutScheme implements Scheme {
- protected SpoutKafkaMessageDeserializer deserializer;
-
- public KafkaSourcedSpoutScheme(String deserClsName, Config context){
- try{
- Properties prop = new Properties();
- if(context.hasPath("eagleProps")) {
- prop.putAll(context.getObject("eagleProps"));
- }
- Constructor<?> constructor = Class.forName(deserClsName).getConstructor(Properties.class);
- deserializer = (SpoutKafkaMessageDeserializer) constructor.newInstance(prop);
- }catch(Exception ex){
- throw new RuntimeException("Failed to create new instance for decoder class " + deserClsName, ex);
- }
- }
-
- @Override
- public List<Object> deserialize(byte[] ser) {
- Object tmp = deserializer.deserialize(ser);
- if(tmp == null)
- return null;
- // the following tasks are executed within the same process of kafka spout
- return Arrays.asList(tmp);
- }
-
- /**
- * Default only f0, but it requires to be overrode if different
- *
- * TODO: Handle the schema with KeyValue based structure
- *
- * @return Fields
- */
- @Override
- public Fields getOutputFields() {
- return new Fields(NameConstants.FIELD_PREFIX()+"0");
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java
new file mode 100644
index 0000000..2d2936c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * *
+ *
+ */
+
+package org.apache.eagle.dataproc.impl.storm.kafka;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.BrokerHosts;
+import storm.kafka.KafkaSpout;
+import storm.kafka.SpoutConfig;
+import storm.kafka.ZkHosts;
+
+import java.util.Arrays;
+
+/**
+ * Since 6/8/16.
+ */
+public class KafkaSpoutProvider implements StormSpoutProvider {
+ private final static Logger LOG = LoggerFactory.getLogger(KafkaSpoutProvider.class);
+
+ private String configPrefix = "dataSourceConfig";
+
+ public KafkaSpoutProvider(){}
+
+ public KafkaSpoutProvider(String prefix){
+ this.configPrefix = prefix;
+ }
+
+ @Override
+ public BaseRichSpout getSpout(Config config){
+ Config context = config;
+ if(this.configPrefix!=null) context = config.getConfig(configPrefix);
+ // Kafka topic
+ String topic = context.getString("topic");
+ // Kafka consumer group id
+ String groupId = context.getString("consumerGroupId");
+ // Kafka fetch size
+ int fetchSize = context.getInt("fetchSize");
+ // Kafka broker zk connection
+ String zkConnString = context.getString("zkConnection");
+ // transaction zkRoot
+ String zkRoot = context.getString("transactionZKRoot");
+
+ LOG.info(String.format("Use topic id: %s",topic));
+
+ String brokerZkPath = null;
+ if(context.hasPath("brokerZkPath")) {
+ brokerZkPath = context.getString("brokerZkPath");
+ }
+
+ BrokerHosts hosts;
+ if(brokerZkPath == null) {
+ hosts = new ZkHosts(zkConnString);
+ } else {
+ hosts = new ZkHosts(zkConnString, brokerZkPath);
+ }
+
+ SpoutConfig spoutConfig = new SpoutConfig(hosts,
+ topic,
+ zkRoot + "/" + topic,
+ groupId);
+
+ // transaction zkServers
+ spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(","));
+ // transaction zkPort
+ spoutConfig.zkPort = context.getInt("transactionZKPort");
+ // transaction update interval
+ spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS");
+ // Kafka fetch size
+ spoutConfig.fetchSizeBytes = fetchSize;
+ // "startOffsetTime" is for test usage, prod should not use this
+ if (context.hasPath("startOffsetTime")) {
+ spoutConfig.startOffsetTime = context.getInt("startOffsetTime");
+ }
+ // "forceFromStart" is for test usage, prod should not use this
+ if (context.hasPath("forceFromStart")) {
+ spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+ }
+
+ if (context.hasPath("schemeCls")) {
+ try {
+ Scheme s = (Scheme)Class.forName(context.getString("schemeCls")).newInstance();
+ spoutConfig.scheme = new SchemeAsMultiScheme(s);
+ }catch(Exception ex){
+ LOG.error("error instantiating scheme object");
+ throw new IllegalStateException(ex);
+ }
+ }else{
+ String err = "schemeCls must be present";
+ LOG.error(err);
+ throw new IllegalStateException(err);
+ }
+ return new KafkaSpout(spoutConfig);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java
deleted file mode 100644
index d764ac1..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- *
- * *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- * *
- *
- */
-
-package org.apache.eagle.dataproc.impl.storm.kafka;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.topology.base.BaseRichSpout;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaSpout;
-import storm.kafka.SpoutConfig;
-import storm.kafka.ZkHosts;
-
-import java.util.Arrays;
-
-/**
- * Since 6/8/16.
- */
-public class NewKafkaSourcedSpoutProvider implements StormSpoutProvider {
- private final static Logger LOG = LoggerFactory.getLogger(NewKafkaSourcedSpoutProvider.class);
-
- private String configPrefix = "dataSourceConfig";
-
- public NewKafkaSourcedSpoutProvider(){}
-
- public NewKafkaSourcedSpoutProvider(String prefix){
- this.configPrefix = prefix;
- }
-
- @Override
- public BaseRichSpout getSpout(Config config){
- Config context = config;
- if(this.configPrefix!=null) context = config.getConfig(configPrefix);
- // Kafka topic
- String topic = context.getString("topic");
- // Kafka consumer group id
- String groupId = context.getString("consumerGroupId");
- // Kafka fetch size
- int fetchSize = context.getInt("fetchSize");
- // Kafka broker zk connection
- String zkConnString = context.getString("zkConnection");
- // transaction zkRoot
- String zkRoot = context.getString("transactionZKRoot");
-
- LOG.info(String.format("Use topic id: %s",topic));
-
- String brokerZkPath = null;
- if(context.hasPath("brokerZkPath")) {
- brokerZkPath = context.getString("brokerZkPath");
- }
-
- BrokerHosts hosts;
- if(brokerZkPath == null) {
- hosts = new ZkHosts(zkConnString);
- } else {
- hosts = new ZkHosts(zkConnString, brokerZkPath);
- }
-
- SpoutConfig spoutConfig = new SpoutConfig(hosts,
- topic,
- zkRoot + "/" + topic,
- groupId);
-
- // transaction zkServers
- spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(","));
- // transaction zkPort
- spoutConfig.zkPort = context.getInt("transactionZKPort");
- // transaction update interval
- spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS");
- // Kafka fetch size
- spoutConfig.fetchSizeBytes = fetchSize;
- // "startOffsetTime" is for test usage, prod should not use this
- if (context.hasPath("startOffsetTime")) {
- spoutConfig.startOffsetTime = context.getInt("startOffsetTime");
- }
- // "forceFromStart" is for test usage, prod should not use this
- if (context.hasPath("forceFromStart")) {
- spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
- }
-
- if (context.hasPath("schemeCls")) {
- try {
- Scheme s = (Scheme)Class.forName(context.getString("schemeCls")).newInstance();
- spoutConfig.scheme = new SchemeAsMultiScheme(s);
- }catch(Exception ex){
- LOG.error("error instantiating scheme object");
- throw new IllegalStateException(ex);
- }
- }else{
- String err = "schemeCls must be present";
- LOG.error(err);
- throw new IllegalStateException(err);
- }
- return new KafkaSpout(spoutConfig);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java
deleted file mode 100644
index 04b4bed..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.utils;
-
-import java.lang.reflect.ParameterizedType;
-
-/**
- * @since 12/7/15
- */
-class JavaReflections {
- @SuppressWarnings("unchecked")
- public static Class<?> getGenericTypeClass(final Object obj,int index) {
- return (Class<?>) ((ParameterizedType) obj
- .getClass()
- .getGenericSuperclass()).getActualTypeArguments()[index];
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf
index 72c2ae5..c386a71 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf
@@ -30,7 +30,6 @@
"zkConnectionTimeoutMS" : 15000,
"consumerGroupId" : "eagle.consumer",
"fetchSize" : 1048586,
- "deserializerClass" : "org.apache.eagle.datastream.storm.JsonMessageDeserializer",
"transactionZKServers" : "sandbox.hortonworks.com",
"transactionZKPort" : 2181,
"transactionZKRoot" : "/consumers",
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala
deleted file mode 100644
index 1d48752..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.utils
-
-import scala.reflect.api
-import scala.reflect.runtime.{universe => ru}
-
-/**
- * @since 12/7/15
- */
-object Reflections{
- private val UNIT_CLASS = classOf[Unit]
- private val UNIT_TYPE_TAG = ru.typeTag[Unit]
-
- /**
- * Class to TypeTag
- * @param clazz class
- * @tparam T Type T
- * @return
- */
- def typeTag[T](clazz:Class[T]):ru.TypeTag[T]={
- if(clazz == null){
- null
- }else if(clazz == UNIT_CLASS) {
- UNIT_TYPE_TAG.asInstanceOf[ru.TypeTag[T]]
- } else {
- val mirror = ru.runtimeMirror(clazz.getClassLoader)
- val sym = mirror.staticClass(clazz.getCanonicalName)
- val tpe = sym.selfType
- ru.TypeTag(mirror, new api.TypeCreator {
- def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) =
- if (m eq mirror) tpe.asInstanceOf[U#Type]
- else throw new IllegalArgumentException(s"Type tag defined in $mirror cannot be migrated to other mirrors.")
- })
- }
- }
-
- def javaTypeClass[T](obj: AnyRef, index: Int = 0):Class[T] = JavaReflections.getGenericTypeClass(obj,index).asInstanceOf[Class[T]]
- def javaTypeTag[T](obj: AnyRef, index: Int = 0):ru.TypeTag[T] = typeTag(JavaReflections.getGenericTypeClass(obj,index)).asInstanceOf[ru.TypeTag[T]]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java
deleted file mode 100644
index 3f45be7..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.entity;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-
-@SuppressWarnings("serial")
-public abstract class AbstractPolicyDefinitionEntity extends TaggedLogAPIEntity {
-
- public abstract String getPolicyDef();
-
- public abstract boolean isEnabled();
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
deleted file mode 100644
index 4dd9006..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.entity;
-
-import org.apache.eagle.policy.common.Constants;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.Column;
-import org.apache.eagle.log.entity.meta.ColumnFamily;
-import org.apache.eagle.log.entity.meta.Prefix;
-import org.apache.eagle.log.entity.meta.Service;
-import org.apache.eagle.log.entity.meta.Table;
-import org.apache.eagle.log.entity.meta.Tags;
-import org.apache.eagle.log.entity.meta.TimeSeries;
-
-/**
- * ddl to create streammetadata table
- *
- * create 'alertStreamSchema', {NAME => 'f', BLOOMFILTER => 'ROW', VERSIONS => '1', COMPRESSION => 'SNAPPY'}
- */
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("alertStreamSchema")
-@ColumnFamily("f")
-@Prefix("alertStreamSchema")
-@Service(Constants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"application", "streamName", "attrName"})
-public class AlertStreamSchemaEntity extends TaggedLogAPIEntity{
- @Column("a")
- private String attrType;
- @Column("b")
- private String category;
- @Column("c")
- private String attrValueResolver;
- /* all tags form the key for alert de-duplication */
- @Column("d")
- private Boolean usedAsTag;
- @Column("e")
- private String attrDescription;
- @Column("f")
- private String attrDisplayName;
- @Column("g")
- private String defaultValue;
-
- public String getAttrType() {
- return attrType;
- }
- public void setAttrType(String attrType) {
- this.attrType = attrType;
- valueChanged("attrType");
- }
- public String getCategory() {
- return category;
- }
- public void setCategory(String category) {
- this.category = category;
- valueChanged("category");
- }
- public String getAttrValueResolver() {
- return attrValueResolver;
- }
- public void setAttrValueResolver(String attrValueResolver) {
- this.attrValueResolver = attrValueResolver;
- valueChanged("attrValueResolver");
- }
- public Boolean getUsedAsTag() {
- return usedAsTag;
- }
- public void setUsedAsTag(Boolean usedAsTag) {
- this.usedAsTag = usedAsTag;
- valueChanged("usedAsTag");
- }
- public String getAttrDescription() {
- return attrDescription;
- }
- public void setAttrDescription(String attrDescription) {
- this.attrDescription = attrDescription;
- valueChanged("attrDescription");
- }
- public String getAttrDisplayName() {
- return attrDisplayName;
- }
- public void setAttrDisplayName(String attrDisplayName) {
- this.attrDisplayName = attrDisplayName;
- valueChanged("attrDisplayName");
- }
- public String getDefaultValue() {
- return defaultValue;
- }
- public void setDefaultValue(String defaultValue) {
- this.defaultValue = defaultValue;
- valueChanged("defaultValue");
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java
deleted file mode 100644
index 1143b11..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy;
-
-
-public class DefaultPolicyPartitioner implements PolicyPartitioner{
- @Override
- public int partition(int numTotalPartitions, String policyType,
- String policyId) {
- final int prime = 31;
- int result = 1;
- result = result * prime + policyType.hashCode();
- result = result < 0 ? result*-1 : result;
- result = result * prime + policyId.hashCode();
- result = result < 0 ? result*-1 : result;
- return result % numTotalPartitions;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java
deleted file mode 100644
index 7dad895..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy;
-
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.policy.executor.IPolicyExecutor;
-
-public class PolicyEvaluationContext<T extends AbstractPolicyDefinitionEntity, K> {
-
- public IPolicyExecutor<T, K> alertExecutor;
-
- public String policyId;
-
- public PolicyEvaluator<T> evaluator;
-
- public Collector outputCollector;
-
- public ResultRender<T, K> resultRender;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java
deleted file mode 100644
index 46a63ee..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.dataproc.core.ValuesArray;
-
-/***
- *
- * @param <T> - The policy definition entity
- */
-public interface PolicyEvaluator<T extends AbstractPolicyDefinitionEntity> {
- /**
- * take input and evaluate expression
- * input has 3 fields, first is siddhiAlertContext, second one is streamName, the third is map of attribute name/value
- * @param input
- * @throws Exception
- */
- public void evaluate(ValuesArray input) throws Exception;
-
- /**
- * notify policy evaluator that policy is updated
- */
- public void onPolicyUpdate(T newAlertDef);
-
- /**
- * notify policy evaluator that policy is deleted, here is cleanup work for this policy evaluator
- */
- public void onPolicyDelete();
-
- /**
- * get additional context
- */
- public Map<String, String> getAdditionalContext();
-
- /**
- * Get markdown status for the policy.
- */
- public boolean isMarkdownEnabled();
-
- /**
- * Get markdown reason for the given policy.
- */
- public String getMarkdownReason();
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java
deleted file mode 100644
index fa9620c..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy;
-
-import java.io.Serializable;
-
-/**
- * partition policies so that policies can be distributed into different alert evaluators
- */
-public interface PolicyPartitioner extends Serializable {
- int partition(int numTotalPartitions, String policyType, String policyId);
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java
deleted file mode 100644
index cc59880..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-
-import java.util.List;
-
-/**
- * @since Dec 17, 2015
- *
- */
-public interface ResultRender<T extends AbstractPolicyDefinitionEntity, K> {
-
- K render(Config config, List<Object> rets, PolicyEvaluationContext<T, K> siddhiAlertContext, long timestamp);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java
deleted file mode 100644
index c9d28a2..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.executor;
-
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.policy.siddhi.SiddhiEvaluationHandler;
-
-/**
- * Created on 1/10/16.
- */
-public interface IPolicyExecutor<T extends AbstractPolicyDefinitionEntity, K> extends SiddhiEvaluationHandler<T, K> {
- String getExecutorId();
-
- int getPartitionSeq();
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java
deleted file mode 100644
index 2e8fc55..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-
-import java.util.List;
-
-public interface SiddhiEvaluationHandler<T extends AbstractPolicyDefinitionEntity, K> {
-
- void onEvalEvents(PolicyEvaluationContext<T, K> context, List<K> alerts);
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties b/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties
deleted file mode 100644
index d59ded6..0000000
--- a/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, stdout
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
index 86d8bc4..e2ac91a 100644
--- a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
+++ b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
@@ -30,7 +30,7 @@ import com.typesafe.config.ConfigFactory;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
import org.apache.eagle.app.sink.StormStreamSink;
-import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
import org.apache.eagle.gc.executor.GCLogAnalyzerBolt;
import org.apache.eagle.gc.executor.GCMetricGeneratorBolt;
import storm.kafka.StringScheme;
@@ -47,7 +47,7 @@ public class GCLogApplication extends StormApplication{
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
TopologyBuilder builder = new TopologyBuilder();
- NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
+ KafkaSpoutProvider provider = new KafkaSpoutProvider();
IRichSpout spout = provider.getSpout(config);
int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/pom.xml b/eagle-hadoop-metric/pom.xml
index 0d612df..15eea00 100644
--- a/eagle-hadoop-metric/pom.xml
+++ b/eagle-hadoop-metric/pom.xml
@@ -36,28 +36,4 @@
<version>${project.version}</version>
</dependency>
</dependencies>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptor>src/assembly/eagle-hadoop-metric-assembly.xml</descriptor>
- <finalName>eagle-hadoop-metric-${project.version}</finalName>
- </configuration>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <tarLongFileMode>posix</tarLongFileMode>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/src/assembly/eagle-hadoop-metric-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/assembly/eagle-hadoop-metric-assembly.xml b/eagle-hadoop-metric/src/assembly/eagle-hadoop-metric-assembly.xml
deleted file mode 100644
index b581fbc..0000000
--- a/eagle-hadoop-metric/src/assembly/eagle-hadoop-metric-assembly.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
- <id>assembly</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <dependencySets>
- <dependencySet>
- <outputDirectory>/</outputDirectory>
- <useProjectArtifact>false</useProjectArtifact>
- <unpack>true</unpack>
- <scope>runtime</scope>
- <unpackOptions>
- <excludes>
- <exclude>**/application.conf</exclude>
- <exclude>**/defaults.yaml</exclude>
- <exclude>**/*storm.yaml</exclude>
- <exclude>**/*storm.yaml.1</exclude>
- <exclude>**/log4j.properties</exclude>
- </excludes>
- </unpackOptions>
- <excludes>
- <exclude>org.apache.storm:storm-core</exclude>
- <exclude>org.slf4j:slf4j-api</exclude>
- <exclude>org.slf4j:log4j-over-slf4j</exclude>
- <exclude>org.slf4j:slf4j-log4j12</exclude>
- <exclude>log4j:log4j</exclude>
- <exclude>asm:asm</exclude>
- <exclude>org.apache.log4j.wso2:log4j</exclude>
- </excludes>
- </dependencySet>
- </dependencySets>
-
- <fileSets>
- <fileSet>
- <directory>${project.build.outputDirectory}</directory>
- <outputDirectory>/</outputDirectory>
- <excludes>
- <exclude>application.conf</exclude>
- <exclude>log4j.properties</exclude>
- <exclude>**/storm.yaml.1</exclude>
- </excludes>
- </fileSet>
- </fileSets>
-</assembly>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
index 20ef5d0..40d1a24 100644
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
@@ -18,19 +18,51 @@
package org.apache.eagle.hadoop.metric;
import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
+import storm.kafka.StringScheme;
/**
* Since 8/12/16.
+ * This application just pass through data from jmx metric
+ * For persistence or alert purpose, it is not necessary to start application
+ * But keep this application in case of future business process
+ *
+ * Note: this application should be run as multiple instances based on different topic for data source
*/
public class HadoopJmxApplication extends StormApplication {
+ public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+ public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
+ public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
+
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
+ int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+ int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
+ int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
+
TopologyBuilder builder = new TopologyBuilder();
+
+ KafkaSpoutProvider provider = new KafkaSpoutProvider();
+ IRichSpout spout = provider.getSpout(config);
+ builder.setSpout("ingest", spout, numOfSpoutTasks);
+
+ JsonParserBolt bolt = new JsonParserBolt();
+ BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", bolt, numOfParserTasks);
+ boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
+
+ StormStreamSink sinkBolt = environment.getStreamSink("hadoop_jmx_stream",config);
+ BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks);
+ kafkaBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
+
return builder.createTopology();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java
new file mode 100644
index 0000000..7ca5ba6
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.hadoop.metric;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * Since 8/14/16.
+ */
+public class JsonParserBolt extends BaseRichBolt {
+ private Logger LOG = LoggerFactory.getLogger(JsonParserBolt.class);
+ private OutputCollector collector;
+ private ObjectMapper mapper = new ObjectMapper();
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ String msg = input.getString(0);
+ try {
+ Map ret = mapper.readValue(msg, Map.class);
+ collector.emit(Arrays.asList(ret));
+ }catch(Exception ex){
+ LOG.error("error in passing json message", ex);
+ }finally{
+ collector.ack(input);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("msg"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java
deleted file mode 100644
index 173441c..0000000
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.hadoop.metric;
-
-import backtype.storm.spout.SchemeAsMultiScheme;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created on 1/25/16.
- */
-public class Utils {
-
- /**
- * Creates a spout provider that have host-metric as the first tuple data, so that it's feasible for alert grouping.
- *
- * @param config
- * @return
- */
- public static KafkaSourcedSpoutProvider createProvider(Config config) {
- String deserClsName = config.getString("dataSourceConfig.deserializerClass");
- final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
-
- @Override
- public List<Object> deserialize(byte[] ser) {
- Object tmp = deserializer.deserialize(ser);
- Map<String, Object> map = (Map<String, Object>) tmp;
- if (tmp == null) return null;
- // this is the key to be grouped by
- return Arrays.asList(String.format("%s-%s", map.get("host"), map.get("metric")), tmp);
- }
-
- };
-
- KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
-
- @Override
- public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
- return new SchemeAsMultiScheme(scheme);
- }
-
- };
- return provider;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/application.conf b/eagle-hadoop-metric/src/main/resources/application.conf
index dc1c7f3..e75355f 100644
--- a/eagle-hadoop-metric/src/main/resources/application.conf
+++ b/eagle-hadoop-metric/src/main/resources/application.conf
@@ -14,56 +14,38 @@
# limitations under the License.
{
- "envContextConfig" : {
- "env" : "storm",
- "mode" : "local",
- "topologyName" : "hadoopJmxMetricTopology",
- "stormConfigFile" : "hadoopjmx.yaml",
- "parallelismConfig" : {
- "kafkaMsgConsumer" : 1,
- "hadoopJmxMetricAlertExecutor*" : 1
- }
+ "appId" : "HadoopJmxApplication",
+ "mode" : "LOCAL",
+ "siteId" : "testsite",
+ "topology" : {
+ "numOfSpoutTasks" : 2,
+ "numOfParserTasks" : 2,
+ "numOfSinkTasks" : 2
},
"dataSourceConfig": {
- "topic" : "nn_jmx_metric_sandbox",
- "zkConnection" : "sandbox.hortonworks.com:2181",
+ "topic" : "jmx_metric",
+ "zkConnection" : "server.eagle.apache.org:2181",
"zkConnectionTimeoutMS" : 15000,
"consumerGroupId" : "EagleConsumer",
"fetchSize" : 1048586,
- "deserializerClass" : "org.apache.eagle.datastream.storm.JsonMessageDeserializer",
- "transactionZKServers" : "sandbox.hortonworks.com",
+ "transactionZKServers" : "server.eagle.apache.org",
"transactionZKPort" : 2181,
"transactionZKRoot" : "/consumers",
"transactionStateUpdateMS" : 2000
- },
- "alertExecutorConfigs" : {
- "hadoopJmxMetricAlertExecutor" : {
- "parallelism" : 1,
- "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
- "needValidation" : "true"
- }
+ "schemeCls" : "storm.kafka.StringScheme"
},
"eagleProps" : {
- "site" : "sandbox",
- "application": "hadoopJmxMetricDataSource",
- "dataJoinPollIntervalSec" : 30,
- "mailHost" : "mailHost.com",
- "mailSmtpPort":"25",
- "mailDebug" : "true",
- "balancePartitionEnabled" : true,
- #"partitionRefreshIntervalInMin" : 60,
- #"kafkaStatisticRangeInMin" : 60,
"eagleService": {
"host": "localhost",
- "port": 9099,
+ "port": 9090,
"username": "admin",
"password": "secret"
}
- "readHdfsUserCommandPatternFrom" : "file"
},
- "dynamicConfigSource" : {
- "enabled" : true,
- "initDelayMillis" : 0,
- "delayMillis" : 30000
+ "dataSinkConfig": {
+ "topic" : "jmx_metric_parsed",
+ "brokerList" : "server.eagle.apache.org:6667",
+ "serializerClass" : "kafka.serializer.StringEncoder",
+ "keySerializerClass" : "kafka.serializer.StringEncoder"
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
index c738b90..4ef50ea 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
@@ -17,7 +17,6 @@
package org.apache.eagle.metric.kafka;
import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichSpout;
@@ -26,16 +25,8 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaSpout;
-import storm.kafka.SpoutConfig;
-import storm.kafka.ZkHosts;
-
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -62,69 +53,9 @@ public class EagleMetricCollectorApplication extends StormApplication{
}
};
- // TODO: Refactored the anonymous in to independen class file, avoiding too complex logic in main method
- KafkaSourcedSpoutProvider kafkaMessageSpoutProvider = new KafkaSourcedSpoutProvider() {
- @Override
- public BaseRichSpout getSpout(Config context) {
- // Kafka topic
- String topic = context.getString("dataSourceConfig.topic");
- // Kafka consumer group id
- String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId");
- // Kafka fetch size
- int fetchSize = context.getInt("dataSourceConfig.fetchSize");
- // Kafka deserializer class
- String deserClsName = context.getString("dataSourceConfig.deserializerClass");
-
- // Kafka broker zk connection
- String zkConnString = context.getString("dataSourceConfig.zkQuorum");
-
- // transaction zkRoot
- String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
-
- LOG.info(String.format("Use topic id: %s",topic));
-
- String brokerZkPath = null;
- if(context.hasPath("dataSourceConfig.brokerZkPath")) {
- brokerZkPath = context.getString("dataSourceConfig.brokerZkPath");
- }
-
- BrokerHosts hosts;
- if(brokerZkPath == null) {
- hosts = new ZkHosts(zkConnString);
- } else {
- hosts = new ZkHosts(zkConnString, brokerZkPath);
- }
-
- SpoutConfig spoutConfig = new SpoutConfig(hosts,
- topic,
- zkRoot + "/" + topic,
- groupId);
-
- // transaction zkServers
- String[] zkConnections = zkConnString.split(",");
- List<String> zkHosts = new ArrayList<>();
- for (String zkConnection : zkConnections) {
- zkHosts.add(zkConnection.split(":")[0]);
- }
- Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
-
- spoutConfig.zkServers = zkHosts;
- // transaction zkPort
- spoutConfig.zkPort = zkPort;
- // transaction update interval
- spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
- // Kafka fetch size
- spoutConfig.fetchSizeBytes = fetchSize;
-
- spoutConfig.scheme = new SchemeAsMultiScheme(scheme);
- KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
- return kafkaSpout;
- }
- };
-
TopologyBuilder builder = new TopologyBuilder();
BaseRichSpout spout1 = new KafkaOffsetSourceSpoutProvider().getSpout(config);
- BaseRichSpout spout2 = kafkaMessageSpoutProvider.getSpout(config);
+ BaseRichSpout spout2 = KafkaSourcedSpoutProvider.getSpout(config, scheme);
int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
int numOfDistributionTasks = config.getInt(DISTRIBUTION_TASK_NUM);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutProvider.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutProvider.java
new file mode 100644
index 0000000..382ae1a
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.BrokerHosts;
+import storm.kafka.KafkaSpout;
+import storm.kafka.SpoutConfig;
+import storm.kafka.ZkHosts;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Since 8/14/16.
+ */
+public class KafkaSourcedSpoutProvider {
+ private final static Logger LOG = LoggerFactory.getLogger(KafkaSourcedSpoutProvider.class);
+ public static BaseRichSpout getSpout(Config context, Scheme scheme) {
+ // Kafka topic
+ String topic = context.getString("dataSourceConfig.topic");
+ // Kafka consumer group id
+ String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId");
+ // Kafka fetch size
+ int fetchSize = context.getInt("dataSourceConfig.fetchSize");
+ // Kafka deserializer class
+ String deserClsName = context.getString("dataSourceConfig.deserializerClass");
+
+ // Kafka broker zk connection
+ String zkConnString = context.getString("dataSourceConfig.zkQuorum");
+
+ // transaction zkRoot
+ String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
+
+ LOG.info(String.format("Use topic id: %s",topic));
+
+ String brokerZkPath = null;
+ if(context.hasPath("dataSourceConfig.brokerZkPath")) {
+ brokerZkPath = context.getString("dataSourceConfig.brokerZkPath");
+ }
+
+ BrokerHosts hosts;
+ if(brokerZkPath == null) {
+ hosts = new ZkHosts(zkConnString);
+ } else {
+ hosts = new ZkHosts(zkConnString, brokerZkPath);
+ }
+
+ SpoutConfig spoutConfig = new SpoutConfig(hosts,
+ topic,
+ zkRoot + "/" + topic,
+ groupId);
+
+ // transaction zkServers
+ String[] zkConnections = zkConnString.split(",");
+ List<String> zkHosts = new ArrayList<>();
+ for (String zkConnection : zkConnections) {
+ zkHosts.add(zkConnection.split(":")[0]);
+ }
+ Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
+
+ spoutConfig.zkServers = zkHosts;
+ // transaction zkPort
+ spoutConfig.zkPort = zkPort;
+ // transaction update interval
+ spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
+ // Kafka fetch size
+ spoutConfig.fetchSizeBytes = fetchSize;
+
+ spoutConfig.scheme = new SchemeAsMultiScheme(scheme);
+ KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
+ return kafkaSpout;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java
new file mode 100644
index 0000000..14b2384
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
+import org.apache.eagle.datastream.utils.NameConstants;
+
+import java.lang.reflect.Constructor;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * This scheme defines how a kafka message is deserialized and the output field name for storm stream
+ * it includes the following:
+ * 1. data source is kafka, so need kafka message deserializer class
+ * 2. output field declaration
+ */
+public class KafkaSourcedSpoutScheme implements Scheme {
+ protected SpoutKafkaMessageDeserializer deserializer;
+
+ public KafkaSourcedSpoutScheme(String deserClsName, Config context){
+ try{
+ Properties prop = new Properties();
+ if(context.hasPath("eagleProps")) {
+ prop.putAll(context.getObject("eagleProps"));
+ }
+ Constructor<?> constructor = Class.forName(deserClsName).getConstructor(Properties.class);
+ deserializer = (SpoutKafkaMessageDeserializer) constructor.newInstance(prop);
+ }catch(Exception ex){
+ throw new RuntimeException("Failed to create new instance for decoder class " + deserClsName, ex);
+ }
+ }
+
+ @Override
+ public List<Object> deserialize(byte[] ser) {
+ Object tmp = deserializer.deserialize(ser);
+ if(tmp == null)
+ return null;
+ // the following tasks are executed within the same process of kafka spout
+ return Arrays.asList(tmp);
+ }
+
+ /**
+ * Default only f0, but it requires to be overrode if different
+ *
+ * TODO: Handle the schema with KeyValue based structure
+ *
+ * @return Fields
+ */
+ @Override
+ public Fields getOutputFields() {
+ return new Fields(NameConstants.FIELD_PREFIX()+"0");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
index 432043f..f5753cf 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
@@ -26,7 +26,7 @@ import com.typesafe.config.ConfigFactory;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
import org.apache.eagle.app.sink.StormStreamSink;
-import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
/**
* Since 7/27/16.
@@ -40,7 +40,7 @@ public class HBaseAuditLogApplication extends StormApplication {
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
TopologyBuilder builder = new TopologyBuilder();
- NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
+ KafkaSpoutProvider provider = new KafkaSpoutProvider();
IRichSpout spout = provider.getSpout(config);
HBaseAuditLogParserBolt bolt = new HBaseAuditLogParserBolt();