You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/13 06:46:20 UTC
[2/2] incubator-eagle git commit: EAGLE-444 convert eagle-gc app to
use new app framework convert eagle-gc app to use new app framework
EAGLE-444 convert eagle-gc app to use new app framework
convert eagle-gc app to use new app framework
Author: @yonzhang2012 <yo...@gmail.com>
Closes: #339
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/98458658
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/98458658
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/98458658
Branch: refs/heads/develop
Commit: 984586580b3840f254f36e2f24a786c1eca98370
Parents: 15e1c83
Author: yonzhang <yo...@gmail.com>
Authored: Fri Aug 12 23:50:10 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Fri Aug 12 23:50:10 2016 -0700
----------------------------------------------------------------------
.../kafka/NewKafkaSourcedSpoutProvider.java | 118 ++++++++++
eagle-gc/pom.xml | 5 +
.../org/apache/eagle/gc/GCLogApplication.java | 79 +++++++
.../eagle/gc/GCLogApplicationProvider.java | 34 +++
.../org/apache/eagle/gc/GCLogProcessorMain.java | 42 ----
.../gc/executor/GCLogAnalysorExecutor.java | 77 -------
.../eagle/gc/executor/GCLogAnalyzerBolt.java | 85 +++++++
.../gc/executor/GCMetricGeneratorBolt.java | 125 +++++++++++
.../gc/executor/GCMetricGeneratorExecutor.java | 108 ---------
.../eagle/gc/spout/GCLogDeserializer.java | 39 ----
....security.hbase.GCLogApplicationProvider.xml | 221 +++++++++++++++++++
...org.apache.eagle.app.spi.ApplicationProvider | 37 ++++
eagle-gc/src/main/resources/application.conf | 48 ++--
eagle-gc/src/main/resources/gc-storm.yaml | 18 --
eagle-gc/src/main/resources/log4j.properties | 3 -
eagle-hadoop-metric/pom.xml | 5 +
.../hadoop/metric/HadoopJmxApplication.java | 42 ++++
.../hadoop/metric/HadoopJmxMetricMonitor.java | 11 +-
.../HadoopJmxMetricMonitoringTopology.java | 37 ----
.../kafka/EagleMetricCollectorApplication.java | 148 +++++++++++++
.../metric/kafka/EagleMetricCollectorMain.java | 122 ----------
.../kafka/KafkaMessageDistributionBolt.java | 114 ++++++++++
.../kafka/KafkaMessageDistributionExecutor.java | 103 ---------
.../topo/NewKafkaSourcedSpoutProvider.java | 115 ----------
.../hbase/HBaseAuditLogAppProvider.java | 7 -
.../hbase/HBaseAuditLogApplication.java | 3 +-
.../AbstractHdfsAuditLogApplication.java | 3 +-
.../securitylog/HdfsAuthLogMonitoringMain.java | 3 +-
.../eagle-security-oozie-auditlog/pom.xml | 5 +
.../oozie/parse/OozieAuditLogApplication.java | 75 +++++++
.../oozie/parse/OozieAuditLogParserBolt.java | 86 ++++++++
.../oozie/parse/OozieAuditLogProcessorMain.java | 33 ---
.../OozieResourceSensitivityDataJoinBolt.java | 108 +++++++++
...ozieResourceSensitivityDataJoinExecutor.java | 89 --------
34 files changed, 1315 insertions(+), 833 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java
new file mode 100644
index 0000000..d764ac1
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * *
+ *
+ */
+
+package org.apache.eagle.dataproc.impl.storm.kafka;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.BrokerHosts;
+import storm.kafka.KafkaSpout;
+import storm.kafka.SpoutConfig;
+import storm.kafka.ZkHosts;
+
+import java.util.Arrays;
+
+/**
+ * Since 6/8/16.
+ */
+public class NewKafkaSourcedSpoutProvider implements StormSpoutProvider {
+ private final static Logger LOG = LoggerFactory.getLogger(NewKafkaSourcedSpoutProvider.class);
+
+ private String configPrefix = "dataSourceConfig";
+
+ public NewKafkaSourcedSpoutProvider(){}
+
+ public NewKafkaSourcedSpoutProvider(String prefix){
+ this.configPrefix = prefix;
+ }
+
+ @Override
+ public BaseRichSpout getSpout(Config config){
+ Config context = config;
+ if(this.configPrefix!=null) context = config.getConfig(configPrefix);
+ // Kafka topic
+ String topic = context.getString("topic");
+ // Kafka consumer group id
+ String groupId = context.getString("consumerGroupId");
+ // Kafka fetch size
+ int fetchSize = context.getInt("fetchSize");
+ // Kafka broker zk connection
+ String zkConnString = context.getString("zkConnection");
+ // transaction zkRoot
+ String zkRoot = context.getString("transactionZKRoot");
+
+ LOG.info(String.format("Use topic id: %s",topic));
+
+ String brokerZkPath = null;
+ if(context.hasPath("brokerZkPath")) {
+ brokerZkPath = context.getString("brokerZkPath");
+ }
+
+ BrokerHosts hosts;
+ if(brokerZkPath == null) {
+ hosts = new ZkHosts(zkConnString);
+ } else {
+ hosts = new ZkHosts(zkConnString, brokerZkPath);
+ }
+
+ SpoutConfig spoutConfig = new SpoutConfig(hosts,
+ topic,
+ zkRoot + "/" + topic,
+ groupId);
+
+ // transaction zkServers
+ spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(","));
+ // transaction zkPort
+ spoutConfig.zkPort = context.getInt("transactionZKPort");
+ // transaction update interval
+ spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS");
+ // Kafka fetch size
+ spoutConfig.fetchSizeBytes = fetchSize;
+ // "startOffsetTime" is for test usage, prod should not use this
+ if (context.hasPath("startOffsetTime")) {
+ spoutConfig.startOffsetTime = context.getInt("startOffsetTime");
+ }
+ // "forceFromStart" is for test usage, prod should not use this
+ if (context.hasPath("forceFromStart")) {
+ spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+ }
+
+ if (context.hasPath("schemeCls")) {
+ try {
+ Scheme s = (Scheme)Class.forName(context.getString("schemeCls")).newInstance();
+ spoutConfig.scheme = new SchemeAsMultiScheme(s);
+ }catch(Exception ex){
+ LOG.error("error instantiating scheme object");
+ throw new IllegalStateException(ex);
+ }
+ }else{
+ String err = "schemeCls must be present";
+ LOG.error(err);
+ throw new IllegalStateException(err);
+ }
+ return new KafkaSpout(spoutConfig);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-gc/pom.xml b/eagle-gc/pom.xml
index 66400cb..cb31156 100644
--- a/eagle-gc/pom.xml
+++ b/eagle-gc/pom.xml
@@ -46,5 +46,10 @@
<artifactId>eagle-alert-process</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-app-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
new file mode 100644
index 0000000..86d8bc4
--- /dev/null
+++ b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.eagle.gc;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider;
+import org.apache.eagle.gc.executor.GCLogAnalyzerBolt;
+import org.apache.eagle.gc.executor.GCMetricGeneratorBolt;
+import storm.kafka.StringScheme;
+
+/**
+ * Since 8/12/16.
+ */
+public class GCLogApplication extends StormApplication{
+ public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+ public final static String ANALYZER_TASK_NUM = "topology.numOfAnalyzerTasks";
+ public final static String GENERATOR_TASK_NUM = "topology.numOfGeneratorTasks";
+ public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
+
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ TopologyBuilder builder = new TopologyBuilder();
+ NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
+ IRichSpout spout = provider.getSpout(config);
+
+ int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+ int numOfAnalyzerTasks = config.getInt(ANALYZER_TASK_NUM);
+ int numOfGeneratorTasks = config.getInt(GENERATOR_TASK_NUM);
+ int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
+
+ builder.setSpout("ingest", spout, numOfSpoutTasks);
+
+ GCLogAnalyzerBolt bolt = new GCLogAnalyzerBolt();
+ BoltDeclarer boltDeclarer = builder.setBolt("analyzerBolt", bolt, numOfAnalyzerTasks);
+ boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
+
+ GCMetricGeneratorBolt generatorBolt = new GCMetricGeneratorBolt(config);
+ BoltDeclarer joinBoltDeclarer = builder.setBolt("generatorBolt", generatorBolt, numOfGeneratorTasks);
+ joinBoltDeclarer.fieldsGrouping("analyzerBolt", new Fields("f1"));
+
+ StormStreamSink sinkBolt = environment.getStreamSink("gc_log_stream",config);
+ BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks);
+ kafkaBoltDeclarer.fieldsGrouping("generatorBolt", new Fields("f1"));
+ return builder.createTopology();
+ }
+
+ public static void main(String[] args){
+ Config config = ConfigFactory.load();
+ GCLogApplication app = new GCLogApplication();
+ app.run(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplicationProvider.java b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplicationProvider.java
new file mode 100644
index 0000000..eae170a
--- /dev/null
+++ b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplicationProvider.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * *
+ *
+ */
+
+package org.apache.eagle.gc;
+
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+
+/**
+ * Since 8/12/16.
+ */
+public class GCLogApplicationProvider extends AbstractApplicationProvider<GCLogApplication> {
+ @Override
+ public GCLogApplication getApplication() {
+ return new GCLogApplication();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogProcessorMain.java b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogProcessorMain.java
deleted file mode 100644
index 278a5bc..0000000
--- a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogProcessorMain.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.gc;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.apache.eagle.gc.executor.GCLogAnalysorExecutor;
-import org.apache.eagle.gc.executor.GCMetricGeneratorExecutor;
-
-public class GCLogProcessorMain {
-
- public static void main(String[] args) throws Exception{
- StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
- Config config = env.getConfig();
- KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider();
- GCLogAnalysorExecutor logAnalysor = new GCLogAnalysorExecutor();
- env.fromSpout(provider.getSpout(config)).withOutputFields(1).nameAs("kafkaMsgConsumer")
- .flatMap(logAnalysor)
- .flatMap(new GCMetricGeneratorExecutor())
- .alertWithConsumer("NNGCLogStream", "NNGCAlert");
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java
deleted file mode 100644
index e74bc44..0000000
--- a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.gc.executor;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.gc.model.GCPausedEvent;
-import org.apache.eagle.gc.stream.GCStreamBuilder;
-import org.apache.eagle.gc.parser.exception.IgnoredLogFormatException;
-import org.apache.eagle.gc.parser.exception.UnrecognizedLogFormatException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.util.List;
-import java.util.Map;
-
-public class GCLogAnalysorExecutor extends JavaStormStreamExecutor2<String, Map> {
-
- public final static Logger LOG = LoggerFactory.getLogger(GCLogAnalysorExecutor.class);
-
- private Config config;
-
- private long previousLogTime;
-
- @Override
- public void prepareConfig(Config config) {
- this.config = config;
- }
-
- @Override
- public void init() {
-
- }
-
- @Override
- public void flatMap(List<Object> input, Collector<Tuple2<String, Map>> collector) {
- String log = (String)input.get(0);
- GCStreamBuilder builder = new GCStreamBuilder();
- try {
- GCPausedEvent pauseEvent = builder.build(log);
- // Because some gc log like concurrent mode failure may miss timestamp info, so we set the previous log's timestamp for it
- if (pauseEvent.getTimestamp() == 0) {
- pauseEvent.setTimestamp(previousLogTime);
- }
- previousLogTime = pauseEvent.getTimestamp();
- collector.collect(new Tuple2("GCLog", pauseEvent.toMap()));
- }
- catch (IgnoredLogFormatException ex1) {
- //DO nothing
- }
- catch (UnrecognizedLogFormatException ex2) {
- LOG.warn(ex2.getMessage());
- }
- catch (Exception ex3) {
- LOG.error("Got an exception when parsing log: ", ex3);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalyzerBolt.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalyzerBolt.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalyzerBolt.java
new file mode 100644
index 0000000..59720a3
--- /dev/null
+++ b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalyzerBolt.java
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.gc.executor;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.apache.eagle.gc.model.GCPausedEvent;
+import org.apache.eagle.gc.stream.GCStreamBuilder;
+import org.apache.eagle.gc.parser.exception.IgnoredLogFormatException;
+import org.apache.eagle.gc.parser.exception.UnrecognizedLogFormatException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class GCLogAnalyzerBolt extends BaseRichBolt {
+ public final static Logger LOG = LoggerFactory.getLogger(GCLogAnalyzerBolt.class);
+ private OutputCollector collector;
+ private long previousLogTime;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("f1", "f2"));
+ }
+
+
+
+ @Override
+ public void execute(Tuple input) {
+ String log = input.getString(0);
+ GCStreamBuilder builder = new GCStreamBuilder();
+ try {
+ GCPausedEvent pauseEvent = builder.build(log);
+ // Because some gc log like concurrent mode failure may miss timestamp info, so we set the previous log's timestamp for it
+ if (pauseEvent.getTimestamp() == 0) {
+ pauseEvent.setTimestamp(previousLogTime);
+ }
+ previousLogTime = pauseEvent.getTimestamp();
+ collector.emit(Arrays.asList("GCLog", pauseEvent.toMap()));
+ }
+ catch (IgnoredLogFormatException ex1) {
+ //DO nothing
+ }
+ catch (UnrecognizedLogFormatException ex2) {
+ LOG.warn(ex2.getMessage());
+ }
+ catch (Exception ex3) {
+ LOG.error("Got an exception when parsing log: ", ex3);
+ }finally {
+ collector.ack(input);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java
new file mode 100644
index 0000000..2d1023b
--- /dev/null
+++ b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.gc.executor;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.common.config.EagleConfigHelper;
+import org.apache.eagle.datastream.*;
+import org.apache.eagle.gc.common.GCConstants;
+import org.apache.eagle.gc.model.GCPausedEvent;
+import org.apache.eagle.metric.reportor.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.*;
+
+public class GCMetricGeneratorBolt extends BaseRichBolt {
+
+ public final static Logger LOG = LoggerFactory.getLogger(GCMetricGeneratorBolt.class);
+ private Config config;
+ private MetricRegistry registry;
+ private String gcPausedTimeMetricName;
+ private String youngHeapUsageMetricName;
+ private String tenuredHeapUsageMetricName;
+ private String totalHeapUsageMetricName;
+ private Map<String, String> dimensions;
+ private List<EagleMetric> metrics = new ArrayList<>();
+
+ private EagleServiceReporterMetricListener listener;
+ private OutputCollector collector;
+
+ public GCMetricGeneratorBolt(Config config){
+ this.config = config;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ registry = new MetricRegistry();
+ String host = EagleConfigHelper.getServiceHost(config);
+ int port = EagleConfigHelper.getServicePort(config);
+ String username = EagleConfigHelper.getServiceUser(config);
+ String password = EagleConfigHelper.getServicePassword(config);
+ listener = new EagleServiceReporterMetricListener(host, port, username, password);
+ dimensions = new HashMap<>();
+ dimensions.put(EagleConfigConstants.SITE, EagleConfigHelper.getSite(config));
+ dimensions.put(EagleConfigConstants.APPLICATION, EagleConfigHelper.getApplication(config));
+ gcPausedTimeMetricName = MetricKeyCodeDecoder.codeMetricKey(GCConstants.GC_PAUSE_TIME_METRIC_NAME, dimensions);
+
+ this.collector = collector;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("f1", "f2"));
+ }
+ public void registerMetricIfMissing(String metricName, EagleMetric metric) {
+ if (registry.getMetrics().get(metricName) == null) {
+ metric.registerListener(listener);
+ registry.register(metricName, metric);
+ }
+ }
+ @Override
+ public void execute(Tuple input) {
+ try {
+ Map<String, Object> map = (Map<String, Object>) input.getValue(1);
+ GCPausedEvent event = new GCPausedEvent(map);
+ // Generate gc paused time metric
+ EagleCounterMetric metric = new EagleCounterMetric(event.getTimestamp(), gcPausedTimeMetricName, event.getPausedGCTimeSec(), GCConstants.GC_PAUSE_TIME_METRIC_GRANULARITY);
+ registerMetricIfMissing(gcPausedTimeMetricName, metric);
+
+ // Generate young heap paused time metric
+ if (event.isYoungAreaGCed()) {
+ youngHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_YOUNG_MEMORY_METRIC_NAME, dimensions);
+ EagleGaugeMetric metric2 = new EagleGaugeMetric(event.getTimestamp(), youngHeapUsageMetricName, event.getYoungUsedHeapK());
+ metrics.add(metric2);
+ }
+
+ // Generate tenured heap paused time metric
+ if (event.isTenuredAreaGCed()) {
+ tenuredHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_TENURED_MEMORY_METRIC_NAME, dimensions);
+ EagleGaugeMetric metric3 = new EagleGaugeMetric(event.getTimestamp(), tenuredHeapUsageMetricName, event.getTenuredUsedHeapK());
+ metrics.add(metric3);
+ }
+
+ // Generate total heap paused time metric
+ if (event.isTotalHeapUsageAvailable()) {
+ totalHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_TOTAL_MEMORY_METRIC_NAME, dimensions);
+ EagleGaugeMetric metric4 = new EagleGaugeMetric(event.getTimestamp(), totalHeapUsageMetricName, event.getUsedTotalHeapK());
+ metrics.add(metric4);
+ }
+ listener.onMetricFlushed(metrics);
+ metrics.clear();
+ collector.emit(Arrays.asList(input.getValue(0), input.getValue(1)));
+ }catch(Exception ex){
+ LOG.error("error in gc metric generating", ex);
+ }finally {
+ collector.ack(input);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java
deleted file mode 100644
index ebc5a6c..0000000
--- a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.gc.executor;
-
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.Config;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.common.config.EagleConfigHelper;
-import org.apache.eagle.datastream.*;
-import org.apache.eagle.gc.common.GCConstants;
-import org.apache.eagle.gc.model.GCPausedEvent;
-import org.apache.eagle.metric.reportor.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.util.*;
-
-public class GCMetricGeneratorExecutor extends JavaStormStreamExecutor2<String, Map> {
-
- public final static Logger LOG = LoggerFactory.getLogger(GCMetricGeneratorExecutor.class);
- private Config config;
- private MetricRegistry registry;
- private String gcPausedTimeMetricName;
- private String youngHeapUsageMetricName;
- private String tenuredHeapUsageMetricName;
- private String totalHeapUsageMetricName;
- private Map<String, String> dimensions;
- private List<EagleMetric> metrics = new ArrayList<>();
-
- private EagleServiceReporterMetricListener listener;
-
- @Override
- public void prepareConfig(Config config) {
- this.config = config;
- }
-
- @Override
- public void init() {
- registry = new MetricRegistry();
- String host = EagleConfigHelper.getServiceHost(config);
- int port = EagleConfigHelper.getServicePort(config);
- String username = EagleConfigHelper.getServiceUser(config);
- String password = EagleConfigHelper.getServicePassword(config);
- listener = new EagleServiceReporterMetricListener(host, port, username, password);
- dimensions = new HashMap<>();
- dimensions.put(EagleConfigConstants.SITE, EagleConfigHelper.getSite(config));
- dimensions.put(EagleConfigConstants.APPLICATION, EagleConfigHelper.getApplication(config));
- gcPausedTimeMetricName = MetricKeyCodeDecoder.codeMetricKey(GCConstants.GC_PAUSE_TIME_METRIC_NAME, dimensions);
- }
-
- public void registerMetricIfMissing(String metricName, EagleMetric metric) {
- if (registry.getMetrics().get(metricName) == null) {
- metric.registerListener(listener);
- registry.register(metricName, metric);
- }
- }
-
- @Override
- public void flatMap(List<Object> input, Collector<Tuple2<String, Map>> collector) {
- Map<String, Object> map = (Map<String, Object>) input.get(1);
- GCPausedEvent event = new GCPausedEvent(map);
- // Generate gc paused time metric
- EagleCounterMetric metric = new EagleCounterMetric(event.getTimestamp(), gcPausedTimeMetricName, event.getPausedGCTimeSec(), GCConstants.GC_PAUSE_TIME_METRIC_GRANULARITY);
- registerMetricIfMissing(gcPausedTimeMetricName, metric);
-
- // Generate young heap paused time metric
- if (event.isYoungAreaGCed()) {
- youngHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_YOUNG_MEMORY_METRIC_NAME, dimensions);
- EagleGaugeMetric metric2 = new EagleGaugeMetric(event.getTimestamp(), youngHeapUsageMetricName, event.getYoungUsedHeapK());
- metrics.add(metric2);
- }
-
- // Generate tenured heap paused time metric
- if (event.isTenuredAreaGCed()) {
- tenuredHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_TENURED_MEMORY_METRIC_NAME, dimensions);
- EagleGaugeMetric metric3 = new EagleGaugeMetric(event.getTimestamp(), tenuredHeapUsageMetricName, event.getTenuredUsedHeapK());
- metrics.add(metric3);
- }
-
- // Generate total heap paused time metric
- if (event.isTotalHeapUsageAvailable()) {
- totalHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_TOTAL_MEMORY_METRIC_NAME, dimensions);
- EagleGaugeMetric metric4 = new EagleGaugeMetric(event.getTimestamp(), totalHeapUsageMetricName, event.getUsedTotalHeapK());
- metrics.add(metric4);
- }
- listener.onMetricFlushed(metrics);
- metrics.clear();
- collector.collect(new Tuple2(input.get(0), input.get(1)));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/spout/GCLogDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/spout/GCLogDeserializer.java b/eagle-gc/src/main/java/org/apache/eagle/gc/spout/GCLogDeserializer.java
deleted file mode 100644
index f1b3105..0000000
--- a/eagle-gc/src/main/java/org/apache/eagle/gc/spout/GCLogDeserializer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.gc.spout;
-
-import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.util.Properties;
-
-public class GCLogDeserializer implements SpoutKafkaMessageDeserializer {
-
- private Properties props;
-
- public GCLogDeserializer(Properties props){
- this.props = props;
- }
-
- @Override
- public Object deserialize(byte[] arg0) {
- return Bytes.toString(arg0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.GCLogApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.GCLogApplicationProvider.xml b/eagle-gc/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.GCLogApplicationProvider.xml
new file mode 100644
index 0000000..213132d
--- /dev/null
+++ b/eagle-gc/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.GCLogApplicationProvider.xml
@@ -0,0 +1,221 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ /*
+ ~ *
+ ~ * Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ * contributor license agreements. See the NOTICE file distributed with
+ ~ * this work for additional information regarding copyright ownership.
+ ~ * The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ * (the "License"); you may not use this file except in compliance with
+ ~ * the License. You may obtain a copy of the License at
+ ~ *
+ ~ * http://www.apache.org/licenses/LICENSE-2.0
+ ~ *
+ ~ * Unless required by applicable law or agreed to in writing, software
+ ~ * distributed under the License is distributed on an "AS IS" BASIS,
+ ~ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ * See the License for the specific language governing permissions and
+ ~ * limitations under the License.
+ ~ *
+ ~ */
+ -->
+
+<application>
+ <type>GCLogApplication</type>
+ <name>GC Log Monitoring Application</name>
+ <version>0.5.0-incubating</version>
+ <appClass>org.apache.eagle.gc.GCLogApplication</appClass>
+ <viewPath>/apps/example</viewPath>
+ <configuration>
+ <property>
+ <name>dataSourceConfig.topic</name>
+ <displayName>dataSourceConfig.topic</displayName>
+ <value>gc_log</value>
+ <description>data source topic</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkConnection</name>
+ <displayName>dataSourceConfig.zkConnection</displayName>
+ <value>server.eagle.apache.org:2181</value>
+ <description>zk connection</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkConnectionTimeoutMS</name>
+ <displayName>dataSourceConfig.zkConnectionTimeoutMS</displayName>
+ <value>15000</value>
+ <description>zk connection timeout in milliseconds</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.fetchSize</name>
+ <displayName>dataSourceConfig.fetchSize</displayName>
+ <value>1048586</value>
+ <description>kafka fetch size</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionZKServers</name>
+ <displayName>dataSourceConfig.transactionZKServers</displayName>
+ <value>server.eagle.apache.org</value>
+ <description>zookeeper server for offset transaction</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionZKPort</name>
+ <displayName>dataSourceConfig.transactionZKPort</displayName>
+ <value>2181</value>
+ <description>zookeeper server port for offset transaction</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionZKRoot</name>
+ <displayName>dataSourceConfig.transactionZKRoot</displayName>
+ <value>/consumers</value>
+ <description>offset transaction root</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.consumerGroupId</name>
+ <displayName>dataSourceConfig.consumerGroupId</displayName>
+ <value>eagle.hbaseaudit.consumer</value>
+ <description>kafka consumer group Id</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionStateUpdateMS</name>
+ <displayName>dataSourceConfig.transactionStateUpdateMS</displayName>
+ <value>2000</value>
+ <description>zk upate</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.schemeCls</name>
+ <displayName>dataSourceConfig.schemeCls</displayName>
+ <value>storm.kafka.StringScheme</value>
+ <description>scheme class</description>
+ </property>
+ <property>
+ <name>topology.numOfSpoutTasks</name>
+ <displayName>topology.numOfSpoutTasks</displayName>
+ <value>2</value>
+ <description>number of spout tasks</description>
+ </property>
+ <property>
+ <name>topology.numOfAnalyzerTasks</name>
+ <displayName>topology.numOfAnalyzerTasks</displayName>
+ <value>2</value>
+ <description>number of analyzer tasks</description>
+ </property>
+ <property>
+ <name>topology.numOfGeneratorTasks</name>
+ <displayName>topology.numOfGeneratorTasks</displayName>
+ <value>2</value>
+ <description>number of generator tasks</description>
+ </property>
+ <property>
+ <name>topology.numOfSinkTasks</name>
+ <displayName>topology.numOfSinkTasks</displayName>
+ <value>2</value>
+ <description>number of sink tasks</description>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.host</name>
+ <displayName>eagleProps.eagleService.host</displayName>
+ <value>localhost</value>
+ <description>eagle service host</description>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.port</name>
+ <displayName>eagleProps.eagleService.port</displayName>
+ <value>8080</value>
+ <description>eagle service port</description>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.username</name>
+ <displayName>eagleProps.eagleService.username</displayName>
+ <value>admin</value>
+ <description>eagle service username</description>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.password</name>
+ <displayName>eagleProps.eagleService.password</displayName>
+ <value>secret</value>
+ <description>eagle service password</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.topic</name>
+ <displayName>dataSinkConfig.topic</displayName>
+ <value>hbase_audit_log_parsed</value>
+ <description>topic for kafka data sink</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.brokerList</name>
+ <displayName>dataSinkConfig.brokerList</displayName>
+ <value>sandbox.hortonworks.com:6667</value>
+ <description>kafka broker list</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.serializerClass</name>
+ <displayName>dataSinkConfig.serializerClass</displayName>
+ <value>kafka.serializer.StringEncoder</value>
+ <description>serializer class Kafka message value</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.keySerializerClass</name>
+ <displayName>dataSinkConfig.keySerializerClass</displayName>
+ <value>kafka.serializer.StringEncoder</value>
+ <description>serializer class Kafka message key</description>
+ </property>
+ </configuration>
+ <streams>
+ <stream>
+ <streamId>gc_log_stream</streamId>
+ <description>GC Log Stream</description>
+ <validate>true</validate>
+ <timeseries>true</timeseries>
+ <columns>
+ <column>
+ <name>action</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>host</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>status</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>timestamp</name>
+ <type>long</type>
+ </column>
+ </columns>
+ </stream>
+ </streams>
+ <docs>
+ <install>
+ # Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+ ./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+ # Step 2: Set up data collector to flow data into kafka topic in
+
+ ./bin/logstash -f log_collector.conf
+
+ ## `log_collector.conf` sample as following:
+
+ input {
+
+ }
+ filter {
+
+ }
+ output{
+
+ }
+
+ # Step 3: start application
+
+ # Step 4: monitor with featured portal or alert with policies
+ </install>
+ <uninstall>
+ # Step 1: stop and uninstall application
+ # Step 2: delete kafka topic named "${site}_example_source_topic"
+ # Step 3: stop logstash
+ </uninstall>
+ </docs>
+</application>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-gc/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..fc9406f
--- /dev/null
+++ b/eagle-gc/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,37 @@
+#
+# /*
+# *
+# * Licensed to the Apache Software Foundation (ASF) under one or more
+# * contributor license agreements. See the NOTICE file distributed with
+# * this work for additional information regarding copyright ownership.
+# * The ASF licenses this file to You under the Apache License, Version 2.0
+# * (the "License"); you may not use this file except in compliance with
+# * the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# *
+# */
+#
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.gc.GCLogApplicationProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/resources/application.conf b/eagle-gc/src/main/resources/application.conf
index ed14063..33fa733 100644
--- a/eagle-gc/src/main/resources/application.conf
+++ b/eagle-gc/src/main/resources/application.conf
@@ -13,48 +13,40 @@
# See the License for the specific language governing permissions and
# limitations under the License.
{
- "envContextConfig" : {
- "env" : "storm",
- "mode" : "local",
- "topologyName" : "GCAnalysorTopology",
- "stormConfigFile" : "gc-storm.yaml",
- "parallelismConfig" : {
- "kafkaMsgConsumer" : 1
- }
+ "appId" : "GCLogApp",
+ "mode" : "LOCAL",
+ "siteId" : "testsite",
+ "topology" : {
+ "numOfTotalWorkers" : 2,
+ "numOfSpoutTasks" : 2,
+ "numOfAnalyzerTasks" : 2,
+ "numOfGeneratorTasks" : 2,
+ "numOfSinkTasks" : 2
},
"dataSourceConfig": {
- "site" : "sandbox",
- "topic" : "sandbox-namenode-gc_log",
- "consumerGroupId" : "gc.log.eagle.consumer",
- "zkSessionTimeoutMs" : 15000,
- "zkRetryTimes" : 3,
- "zkRetryInterval" : 2000,
+ "topic" : "gc_log",
+ "zkConnection" : "server.eagle.apache.org:2181",
"zkConnectionTimeoutMS" : 15000,
"fetchSize" : 1048586,
- "deserializerClass" : "org.apache.eagle.gc.spout.GCLogDeserializer",
- "zkConnection" : "localhost:2181",
- "transactionZKServers" : "localhost",
+ "transactionZKServers" : "server.eagle.apache.org",
"transactionZKPort" : "2181",
"transactionZKRoot" : "/consumers",
+ "consumerGroupId" : "gc.log.eagle.consumer",
"transactionStateUpdateMS" : 2000,
- "kafkaEndPoints" : "localhost:6667"
+ "schemeCls" : "storm.kafka.StringScheme"
},
"eagleProps" : {
- "site" : "sandbox",
- "application": "NNGCLog",
- "mailHost" : "www.xyz.com",
- "mailSmtpPort":"25",
- "mailDebug" : "true",
"eagleService": {
"host": "localhost",
- "port": 38080,
+ "port": 9090,
"username": "admin",
"password": "secret"
}
},
- "dynamicConfigSource" : {
- "enabled" : true,
- "initDelayMillis" : 0,
- "delayMillis" : 30000
+ "dataSinkConfig": {
+ "topic" : "gc_log_parsed",
+ "brokerList" : "server.eagle.apache.org:6667",
+ "serializerClass" : "kafka.serializer.StringEncoder",
+ "keySerializerClass" : "kafka.serializer.StringEncoder"
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/resources/gc-storm.yaml
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/resources/gc-storm.yaml b/eagle-gc/src/main/resources/gc-storm.yaml
deleted file mode 100644
index a68a323..0000000
--- a/eagle-gc/src/main/resources/gc-storm.yaml
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-topology.workers: 1
-topology.acker.executors: 1
-topology.tasks: 1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/resources/log4j.properties b/eagle-gc/src/main/resources/log4j.properties
index 8a0919a..f5fb8a8 100644
--- a/eagle-gc/src/main/resources/log4j.properties
+++ b/eagle-gc/src/main/resources/log4j.properties
@@ -19,9 +19,6 @@ eagle.log.dir=./logs
eagle.log.file=eagle.log
-#log4j.logger.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG
-#log4j.logger.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG
-#log4j.logger.eagle.executor.AlertExecutor=DEBUG
# standard output
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-hadoop-metric/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/pom.xml b/eagle-hadoop-metric/pom.xml
index ce13ee9..389a481 100644
--- a/eagle-hadoop-metric/pom.xml
+++ b/eagle-hadoop-metric/pom.xml
@@ -35,6 +35,11 @@
<artifactId>eagle-stream-application-manager</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-app-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
new file mode 100644
index 0000000..20ef5d0
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.metric;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+
+/**
+ * Since 8/12/16.
+ */
+public class HadoopJmxApplication extends StormApplication {
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ TopologyBuilder builder = new TopologyBuilder();
+ return builder.createTopology();
+ }
+
+ public static void main(String[] args){
+ Config config = ConfigFactory.load();
+ HadoopJmxApplication app = new HadoopJmxApplication();
+ app.run(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java
index 9202da4..c0b183b 100644
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java
@@ -17,7 +17,6 @@
package org.apache.eagle.hadoop.metric;
import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.core.StreamProducer;
import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
/**
@@ -26,10 +25,10 @@ import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
public class HadoopJmxMetricMonitor {
public static void main(String[] args) {
- StormExecutionEnvironment env = ExecutionEnvironments.get(args, StormExecutionEnvironment.class);
- String streamName = "hadoopJmxMetricEventStream";
- StreamProducer sp = env.fromSpout(Utils.createProvider(env.getConfig())).withOutputFields(2).nameAs(streamName);
- sp.alertWithConsumer(streamName, "hadoopJmxMetricAlertExecutor");
- env.execute();
+// StormExecutionEnvironment env = ExecutionEnvironments.get(args, StormExecutionEnvironment.class);
+// String streamName = "hadoopJmxMetricEventStream";
+// StreamProducer sp = env.fromSpout(Utils.createProvider(env.getConfig())).withOutputFields(2).nameAs(streamName);
+// sp.alertWithConsumer(streamName, "hadoopJmxMetricAlertExecutor");
+// env.execute();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java
deleted file mode 100644
index 044a48f..0000000
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.hadoop.metric;
-
-
-import com.typesafe.config.Config;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.core.StreamProducer;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.apache.eagle.stream.application.TopologyExecutable;
-
-public class HadoopJmxMetricMonitoringTopology implements TopologyExecutable {
- @Override
- public void submit(String topology, Config config) {
- StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
- String streamName = "hadoopJmxMetricEventStream";
- StreamProducer sp = env.fromSpout(Utils.createProvider(env.getConfig())).withOutputFields(2).nameAs(streamName);
- sp.alertWithConsumer(streamName, "hadoopJmxMetricAlertExecutor");
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
new file mode 100644
index 0000000..c738b90
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.BrokerHosts;
+import storm.kafka.KafkaSpout;
+import storm.kafka.SpoutConfig;
+import storm.kafka.ZkHosts;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Since 8/12/16.
+ */
+public class EagleMetricCollectorApplication extends StormApplication{
+ private static final Logger LOG = LoggerFactory.getLogger(EagleMetricCollectorApplication.class);
+
+ public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+ public final static String DISTRIBUTION_TASK_NUM = "topology.numOfDistributionTasks";
+
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ String deserClsName = config.getString("dataSourceConfig.deserializerClass");
+ final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
+ @Override
+ public List<Object> deserialize(byte[] ser) {
+ Object tmp = deserializer.deserialize(ser);
+ Map<String, Object> map = (Map<String, Object>)tmp;
+ if(tmp == null) return null;
+ return Arrays.asList(map.get("user"), map.get("timestamp"));
+ }
+ };
+
+ // TODO: Refactored the anonymous in to independen class file, avoiding too complex logic in main method
+ KafkaSourcedSpoutProvider kafkaMessageSpoutProvider = new KafkaSourcedSpoutProvider() {
+ @Override
+ public BaseRichSpout getSpout(Config context) {
+ // Kafka topic
+ String topic = context.getString("dataSourceConfig.topic");
+ // Kafka consumer group id
+ String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId");
+ // Kafka fetch size
+ int fetchSize = context.getInt("dataSourceConfig.fetchSize");
+ // Kafka deserializer class
+ String deserClsName = context.getString("dataSourceConfig.deserializerClass");
+
+ // Kafka broker zk connection
+ String zkConnString = context.getString("dataSourceConfig.zkQuorum");
+
+ // transaction zkRoot
+ String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
+
+ LOG.info(String.format("Use topic id: %s",topic));
+
+ String brokerZkPath = null;
+ if(context.hasPath("dataSourceConfig.brokerZkPath")) {
+ brokerZkPath = context.getString("dataSourceConfig.brokerZkPath");
+ }
+
+ BrokerHosts hosts;
+ if(brokerZkPath == null) {
+ hosts = new ZkHosts(zkConnString);
+ } else {
+ hosts = new ZkHosts(zkConnString, brokerZkPath);
+ }
+
+ SpoutConfig spoutConfig = new SpoutConfig(hosts,
+ topic,
+ zkRoot + "/" + topic,
+ groupId);
+
+ // transaction zkServers
+ String[] zkConnections = zkConnString.split(",");
+ List<String> zkHosts = new ArrayList<>();
+ for (String zkConnection : zkConnections) {
+ zkHosts.add(zkConnection.split(":")[0]);
+ }
+ Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
+
+ spoutConfig.zkServers = zkHosts;
+ // transaction zkPort
+ spoutConfig.zkPort = zkPort;
+ // transaction update interval
+ spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
+ // Kafka fetch size
+ spoutConfig.fetchSizeBytes = fetchSize;
+
+ spoutConfig.scheme = new SchemeAsMultiScheme(scheme);
+ KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
+ return kafkaSpout;
+ }
+ };
+
+ TopologyBuilder builder = new TopologyBuilder();
+ BaseRichSpout spout1 = new KafkaOffsetSourceSpoutProvider().getSpout(config);
+ BaseRichSpout spout2 = kafkaMessageSpoutProvider.getSpout(config);
+
+ int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+ int numOfDistributionTasks = config.getInt(DISTRIBUTION_TASK_NUM);
+
+ builder.setSpout("kafkaLogLagChecker", spout1, numOfSpoutTasks);
+ builder.setSpout("kafkaMessageFetcher", spout2, numOfSpoutTasks);
+
+ KafkaMessageDistributionBolt bolt = new KafkaMessageDistributionBolt(config);
+ BoltDeclarer bolteclarer = builder.setBolt("distributionBolt", bolt, numOfDistributionTasks);
+ bolteclarer.fieldsGrouping("kafkaLogLagChecker", new Fields("f1"));
+ bolteclarer.fieldsGrouping("kafkaLogLagChecker", new Fields("f1"));
+ return builder.createTopology();
+ }
+
+
+ public static void main(String[] args){
+ Config config = ConfigFactory.load();
+ EagleMetricCollectorApplication app = new EagleMetricCollectorApplication();
+ app.run(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
deleted file mode 100644
index 4eeda05..0000000
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric.kafka;
-
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.topology.base.BaseRichSpout;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaSpout;
-import storm.kafka.SpoutConfig;
-import storm.kafka.ZkHosts;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-public class EagleMetricCollectorMain {
-
- private static final Logger LOG = LoggerFactory.getLogger(EagleMetricCollectorMain.class);
-
- public static void main(String[] args) throws Exception {
- StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
- Config config = env.getConfig();
- String deserClsName = config.getString("dataSourceConfig.deserializerClass");
- final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
- @Override
- public List<Object> deserialize(byte[] ser) {
- Object tmp = deserializer.deserialize(ser);
- Map<String, Object> map = (Map<String, Object>)tmp;
- if(tmp == null) return null;
- return Arrays.asList(map.get("user"), map.get("timestamp"));
- }
- };
-
-
- // TODO: Refactored the anonymous in to independen class file, avoiding too complex logic in main method
- KafkaSourcedSpoutProvider kafkaMessageSpoutProvider = new KafkaSourcedSpoutProvider() {
- @Override
- public BaseRichSpout getSpout(Config context) {
- // Kafka topic
- String topic = context.getString("dataSourceConfig.topic");
- // Kafka consumer group id
- String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId");
- // Kafka fetch size
- int fetchSize = context.getInt("dataSourceConfig.fetchSize");
- // Kafka deserializer class
- String deserClsName = context.getString("dataSourceConfig.deserializerClass");
-
- // Kafka broker zk connection
- String zkConnString = context.getString("dataSourceConfig.zkQuorum");
-
- // transaction zkRoot
- String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
-
- LOG.info(String.format("Use topic id: %s",topic));
-
- String brokerZkPath = null;
- if(context.hasPath("dataSourceConfig.brokerZkPath")) {
- brokerZkPath = context.getString("dataSourceConfig.brokerZkPath");
- }
-
- BrokerHosts hosts;
- if(brokerZkPath == null) {
- hosts = new ZkHosts(zkConnString);
- } else {
- hosts = new ZkHosts(zkConnString, brokerZkPath);
- }
-
- SpoutConfig spoutConfig = new SpoutConfig(hosts,
- topic,
- zkRoot + "/" + topic,
- groupId);
-
- // transaction zkServers
- String[] zkConnections = zkConnString.split(",");
- List<String> zkHosts = new ArrayList<>();
- for (String zkConnection : zkConnections) {
- zkHosts.add(zkConnection.split(":")[0]);
- }
- Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
-
- spoutConfig.zkServers = zkHosts;
- // transaction zkPort
- spoutConfig.zkPort = zkPort;
- // transaction update interval
- spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
- // Kafka fetch size
- spoutConfig.fetchSizeBytes = fetchSize;
-
- spoutConfig.scheme = new SchemeAsMultiScheme(scheme);
- KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
- return kafkaSpout;
- }
- };
-
- env.fromSpout(new KafkaOffsetSourceSpoutProvider()).withOutputFields(0).nameAs("kafkaLogLagChecker");
- env.fromSpout(kafkaMessageSpoutProvider).withOutputFields(2).nameAs("kafkaMessageFetcher").groupBy(Arrays.asList(0))
- .flatMap(new KafkaMessageDistributionExecutor());
- env.execute();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionBolt.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionBolt.java
new file mode 100644
index 0000000..0caa64c
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionBolt.java
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor1;
+import org.apache.eagle.metric.reportor.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple1;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaMessageDistributionBolt extends BaseRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageDistributionBolt.class);
+ private static final long DEFAULT_METRIC_GRANULARITY = 60 * 1000;
+ private Config config;
+ private Map<String, String> baseMetricDimension;
+ private MetricRegistry registry;
+ private EagleMetricListener listener;
+ private long granularity;
+ private OutputCollector collector;
+
+ public KafkaMessageDistributionBolt(Config config){
+ this.config = config;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ String site = config.getString("dataSourceConfig.site");
+ String topic = config.getString("dataSourceConfig.topic");
+ this.baseMetricDimension = new HashMap<>();
+ this.baseMetricDimension.put("site", site);
+ this.baseMetricDimension.put("topic", topic);
+ registry = new MetricRegistry();
+
+ this.granularity = DEFAULT_METRIC_GRANULARITY;
+ if (config.hasPath("dataSourceConfig.kafkaDistributionDataIntervalMin")) {
+ this.granularity = config.getInt("dataSourceConfig.kafkaDistributionDataIntervalMin") * DateUtils.MILLIS_PER_MINUTE;
+ }
+
+ String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+ listener = new EagleServiceReporterMetricListener(host, port, username, password);
+ }
+
+ public String generateMetricKey(String user) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.putAll(baseMetricDimension);
+ dimensions.put("user", user);
+ String metricName = "eagle.kafka.message.count";
+ String encodedMetricName = MetricKeyCodeDecoder.codeMetricKey(metricName, dimensions);
+ return encodedMetricName;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ try {
+ String user = input.getString(0);
+ Long timestamp = input.getLong(1);
+ String metricKey = generateMetricKey(user);
+ if (registry.getMetrics().get(metricKey) == null) {
+ EagleCounterMetric metric = new EagleCounterMetric(timestamp, metricKey, 1.0, granularity);
+ metric.registerListener(listener);
+ registry.register(metricKey, metric);
+ }
+ else {
+ EagleMetric metric = (EagleMetric)registry.getMetrics().get(metricKey);
+ metric.update(1, timestamp);
+ //TODO: if we need to remove metric from registry
+ }
+ }
+ catch (Exception ex) {
+ LOG.error("Got an exception, ex: ", ex);
+ }finally {
+ collector.ack(input);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
deleted file mode 100644
index dbb6e9a..0000000
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.metric.kafka;
-
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.Config;
-import org.apache.commons.lang.time.DateUtils;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor1;
-import org.apache.eagle.metric.reportor.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple1;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class KafkaMessageDistributionExecutor extends JavaStormStreamExecutor1<String> {
-
- private Config config;
- private Map<String, String> baseMetricDimension;
- private MetricRegistry registry;
- private EagleMetricListener listener;
- private long granularity;
- private static final long DEFAULT_METRIC_GRANULARITY = 60 * 1000;
- private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageDistributionExecutor.class);
-
- @Override
- public void prepareConfig(Config config) {
- this.config = config;
- }
-
- @Override
- public void init() {
- String site = config.getString("dataSourceConfig.site");
- String topic = config.getString("dataSourceConfig.topic");
- this.baseMetricDimension = new HashMap<>();
- this.baseMetricDimension.put("site", site);
- this.baseMetricDimension.put("topic", topic);
- registry = new MetricRegistry();
-
- this.granularity = DEFAULT_METRIC_GRANULARITY;
- if (config.hasPath("dataSourceConfig.kafkaDistributionDataIntervalMin")) {
- this.granularity = config.getInt("dataSourceConfig.kafkaDistributionDataIntervalMin") * DateUtils.MILLIS_PER_MINUTE;
- }
-
- String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
- int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
- String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
- String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
- listener = new EagleServiceReporterMetricListener(host, port, username, password);
- }
-
- public String generateMetricKey(String user) {
- Map<String, String> dimensions = new HashMap<>();
- dimensions.putAll(baseMetricDimension);
- dimensions.put("user", user);
- String metricName = "eagle.kafka.message.count";
- String encodedMetricName = MetricKeyCodeDecoder.codeMetricKey(metricName, dimensions);
- return encodedMetricName;
- }
-
- @Override
- public void flatMap(List<Object> input, Collector<Tuple1<String>> collector) {
- try {
- String user = (String) input.get(0);
- Long timestamp = (Long) input.get(1);
- String metricKey = generateMetricKey(user);
- if (registry.getMetrics().get(metricKey) == null) {
- EagleCounterMetric metric = new EagleCounterMetric(timestamp, metricKey, 1.0, granularity);
- metric.registerListener(listener);
- registry.register(metricKey, metric);
- }
- else {
- EagleMetric metric = (EagleMetric)registry.getMetrics().get(metricKey);
- metric.update(1, timestamp);
- //TODO: if we need to remove metric from registry
- }
- }
- catch (Exception ex) {
- LOG.error("Got an exception, ex: ", ex);
- }
- }
-}