You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2017/02/23 08:53:14 UTC

eagle git commit: [EAGLE-927] support for hadoop jmx resource info

Repository: eagle
Updated Branches:
  refs/heads/master 051dc69c5 -> 3011fdf84


[EAGLE-927] support for hadoop jmx resource info

Author: wujinhu <wu...@126.com>

Closes #839 from wujinhu/EAGLE-927.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/3011fdf8
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/3011fdf8
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/3011fdf8

Branch: refs/heads/master
Commit: 3011fdf845e429d842e528ff00a4a7b7b55038e9
Parents: 051dc69
Author: wujinhu <wu...@126.com>
Authored: Thu Feb 23 16:53:07 2017 +0800
Committer: wujinhu <wu...@126.com>
Committed: Thu Feb 23 16:53:07 2017 +0800

----------------------------------------------------------------------
 .../StringSubtractFunctionExtension.java        | 106 +++++++++++++++++++
 .../org/apache/eagle/alert/utils/JsonUtils.java |  20 ++++
 .../src/main/resources/string.siddhiext         |   3 +-
 .../StringSubtractFunctionExtensionTest.java    |  68 ++++++++++++
 .../src/test/resources/string.siddhiext         |  19 ++++
 .../hadoop_jmx_collector/hadoop_jmx_kafka.py    |   7 +-
 .../hadoop_jmx_collector/metric_collector.py    |   6 +-
 ...le.metric.HadoopMetricMonitorAppProdiver.xml |  38 +++++++
 8 files changed, 263 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/3011fdf8/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringSubtractFunctionExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringSubtractFunctionExtension.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringSubtractFunctionExtension.java
new file mode 100644
index 0000000..9d26adf
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/siddhiext/StringSubtractFunctionExtension.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.siddhiext;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.eagle.alert.utils.JsonUtils;
+import org.codehaus.jettison.json.JSONArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.config.ExecutionPlanContext;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.executor.function.FunctionExecutor;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class StringSubtractFunctionExtension extends FunctionExecutor {
+    private static final Logger LOG = LoggerFactory.getLogger(StringSubtractFunctionExtension.class);
+
+    /**
+     * The initialization method for StringSubtractFunctionExtension, this method will be called before the other methods.
+     *
+     * @param attributeExpressionExecutors the executors of each function parameter
+     * @param executionPlanContext         the context of the execution plan
+     */
+    @Override
+    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
+        if (attributeExpressionExecutors.length != 2) {
+            throw new ExecutionPlanValidationException("Invalid no of arguments passed to string:subtract() function, "
+                    + "required 2, but found " + attributeExpressionExecutors.length);
+        }
+
+        Attribute.Type attributeType = attributeExpressionExecutors[0].getReturnType();
+        if (attributeType != Attribute.Type.STRING) {
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the argument of string:subtract() function, "
+                    + "required " + Attribute.Type.STRING
+                    + ", but found " + attributeType.toString());
+        }
+    }
+
+    /**
+     * The main execution method which will be called upon event arrival.
+     * when there are more than one function parameter
+     * This method calculates subtraction of two List Of Strings
+     * Each String is a jobs string needs to be loaded
+     * @param data the runtime values of function parameters
+     * @return the function result
+     */
+    @Override
+    protected Object execute(Object[] data) {
+        try {
+            List<String> ths = JsonUtils.jsonStringToList((String) data[0]);
+            List<String> rhs = JsonUtils.jsonStringToList((String) data[1]);
+
+            return org.apache.commons.lang.StringUtils.join(ListUtils.subtract(ths, rhs), "\n");
+        } catch (Exception e) {
+            LOG.warn("exception found {}", e);
+            return null;
+        }
+    }
+
+    @Override
+    protected Object execute(Object data) {
+        return null;
+    }
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public Attribute.Type getReturnType() {
+        return Attribute.Type.STRING;
+    }
+
+    @Override
+    public Object[] currentState() {
+        return null;
+    }
+
+    @Override
+    public void restoreState(Object[] state) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/3011fdf8/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java
index 2ee1a5f..cc75d34 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/JsonUtils.java
@@ -17,9 +17,13 @@
 package org.apache.eagle.alert.utils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.codehaus.jettison.json.JSONArray;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class JsonUtils {
 
     public static final ObjectMapper mapper = new ObjectMapper();
@@ -33,4 +37,20 @@ public class JsonUtils {
         }
         return "";
     }
+
+    public static List<String> jsonStringToList(String message) {
+        List<String> result = new ArrayList<>();
+        try {
+            if (!message.isEmpty()) {
+                JSONArray jsonArray = new JSONArray(message);
+                for (int i = 0; i < jsonArray.length(); ++i) {
+                    result.add(jsonArray.getString(i));
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("exception found {}", e);
+        }
+
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/3011fdf8/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/string.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/string.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/string.siddhiext
index e16be99..ac8e099 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/string.siddhiext
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/resources/string.siddhiext
@@ -15,4 +15,5 @@
  * limitations under the License.
  */
 
-empty=org.apache.eagle.alert.siddhiext.StringEmptyFunctionExtension
\ No newline at end of file
+empty=org.apache.eagle.alert.siddhiext.StringEmptyFunctionExtension
+subtract=org.apache.eagle.alert.siddhiext.StringSubtractFunctionExtension
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/3011fdf8/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java
new file mode 100644
index 0000000..4a31c69
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/siddhiext/StringSubtractFunctionExtensionTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.siddhiext;
+
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+import org.wso2.siddhi.core.util.EventPrinter;
+
+import java.util.concurrent.Semaphore;
+
+public class StringSubtractFunctionExtensionTest {
+    private static final Logger LOG = LoggerFactory.getLogger(StringSubtractFunctionExtensionTest.class);
+
+    @Test
+    public void testStringSubtract() throws Exception {
+        Semaphore semp = new Semaphore(1);
+        String ql = " define stream log(timestamp long, switchLabel string, port string, message string); " +
+                " from log select string:subtract(switchLabel, message) as alertKey insert into output; ";
+        SiddhiManager manager = new SiddhiManager();
+        ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql);
+        runtime.addCallback("output", new StreamCallback() {
+            @Override
+            public void receive(Event[] events) {
+                EventPrinter.print(events);
+                Assert.assertTrue(events.length == 1);
+                Assert.assertTrue(events[0].getData(0).toString().equals("a\nc\ne"));
+                semp.release();
+            }
+        });
+
+        runtime.start();
+
+        InputHandler logInput = runtime.getInputHandler("log");
+        semp.acquire();
+        Event e = new Event();
+        e.setTimestamp(System.currentTimeMillis());
+        String ths = "[\"a\", \"b\", \"c\", \"d\", \"e\"]";
+        String rhs = "[\"b\", \"d\"]";
+        e.setData(new Object[] {System.currentTimeMillis(), ths, "port01", rhs});
+        logInput.send(e);
+
+        semp.acquire();
+        runtime.shutdown();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/3011fdf8/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/string.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/string.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/string.siddhiext
new file mode 100644
index 0000000..ac8e099
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/string.siddhiext
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+empty=org.apache.eagle.alert.siddhiext.StringEmptyFunctionExtension
+subtract=org.apache.eagle.alert.siddhiext.StringSubtractFunctionExtension
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/3011fdf8/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
index 78183e0..1b036cd 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
@@ -38,6 +38,10 @@ class NNHAMetric(JmxMetricListener):
             else:
                 self.collector.on_bean_kv(self.PREFIX, component, "hastate", 1)
 
+class NameNodeInfo(JmxMetricListener):
+    def on_metric(self, metric):
+        if metric["metric"] == "hadoop.namenode.namenodeinfo.corruptfiles":
+            self.collector.collect(metric, "string")
 
 class MemoryUsageMetric(JmxMetricListener):
     PREFIX = "hadoop.namenode.jvm"
@@ -102,6 +106,7 @@ if __name__ == '__main__':
         NNCapacityUsageMetric(),
         JournalTransactionInfoMetric(),
         DatanodeFSDatasetState(),
-        HBaseRegionServerMetric()
+        HBaseRegionServerMetric(),
+        NameNodeInfo()
     )
     Runner.run(collector)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/3011fdf8/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index 2176db2..bf1d4df 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -294,12 +294,14 @@ class MetricCollector(threading.Thread):
     def start(self):
         super(MetricCollector, self).start()
 
-    def collect(self, msg):
+    def collect(self, msg, type='float'):
         try:
             if not msg.has_key("timestamp"):
                 msg["timestamp"] = int(round(time.time() * 1000))
-            if msg.has_key("value"):
+            if msg.has_key("value") and type == 'float':
                 msg["value"] = float(str(msg["value"]))
+            elif msg.has_key("value") and type == 'string':
+                msg["value"] = str(msg["value"])
             if not msg.has_key("host") or len(msg["host"]) == 0:
                 raise Exception("host is null: " + str(msg))
             if not msg.has_key("site"):

http://git-wip-us.apache.org/repos/asf/eagle/blob/3011fdf8/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
----------------------------------------------------------------------
diff --git a/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml b/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
index 9c34b1f..073c900 100644
--- a/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
+++ b/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
@@ -29,6 +29,13 @@
             <required>true</required>
         </property>
         <property>
+            <name>dataSinkConfig.HADOOP_JMX_RESOURCE_STREAM.topic</name>
+            <displayName>JMX Resource Kafka Topic</displayName>
+            <value>hadoop_jmx_resource_${siteId}</value>
+            <description>Hadoop JMX resource kafka topic name for stream: HADOOP_JMX_RESOURCE_STREAM</description>
+            <required>true</required>
+        </property>
+        <property>
             <name>dataSinkConfig.brokerList</name>
             <displayName>Kafka Brokers</displayName>
             <value>localhost:6667</value>
@@ -126,6 +133,37 @@
                 </column>
             </columns>
         </stream>
+        <stream>
+            <streamId>HADOOP_JMX_RESOURCE_STREAM</streamId>
+            <description>Hadoop JMX Resource Stream including name node, resource manager, etc.</description>
+            <columns>
+                <column>
+                    <name>host</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>timestamp</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>resource</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>component</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>site</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>value</name>
+                    <type>string</type>
+                    <defaultValue>""</defaultValue>
+                </column>
+            </columns>
+        </stream>
     </streams>
     <docs>
         <install>