You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2014/07/22 19:21:37 UTC
svn commit: r1612617 - in /chukwa/trunk: ./
src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/
src/main/java/org/apache/hadoop/chukwa/datacollection/agent/
src/main/java/org/apache/hadoop/chukwa/util/
src/test/java/org/apache/hadoop/chukwa/...
Author: eyang
Date: Tue Jul 22 17:21:37 2014
New Revision: 1612617
URL: http://svn.apache.org/r1612617
Log:
CHUKWA-715. Added Oozie Adaptor for collecting Oozie metrics. (Sreepathi Prasanna via Eric Yang)
Added:
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/ChukwaUtil.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/RestUtil.java
chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestOozieAdaptor.java
Modified:
chukwa/trunk/CHANGES.txt
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
Modified: chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/chukwa/trunk/CHANGES.txt?rev=1612617&r1=1612616&r2=1612617&view=diff
==============================================================================
--- chukwa/trunk/CHANGES.txt (original)
+++ chukwa/trunk/CHANGES.txt Tue Jul 22 17:21:37 2014
@@ -12,6 +12,8 @@ Release 0.6 - Unreleased
NEW FEATURES
+ CHUKWA-715. Added Oozie Adaptor for collecting Oozie metrics. (Sreepathi Prasanna via Eric Yang)
+
CHUKWA-712. Implemented generic REST Adaptor for fetch data from web service. (Sreepathi Prasanna via Eric Yang)
CHUKWA-674. Integrated Chukwa collector feature to Chukwa Agent. (Eric Yang)
Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java?rev=1612617&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java (added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java Tue Jul 22 17:21:37 2014
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Calendar;
+import java.util.TimeZone;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.util.ChukwaUtil;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.chukwa.util.RestUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Logger;
+
+public class OozieAdaptor extends AbstractAdaptor {
+
+ private static Logger log = Logger.getLogger(OozieAdaptor.class);
+ private String uri;
+
+ private long sendOffset;
+ private Configuration chukwaConfiguration = null;
+ private static UserGroupInformation UGI = null;
+ private boolean isKerberosEnabled = false;
+ private int length = 0;
+ private final ScheduledExecutorService scheduler = Executors
+ .newScheduledThreadPool(1);
+ private static final long initialDelay = 60; // seconds
+ private static long periodicity = 60; // seconds
+ private ScheduledFuture<?> scheduledCollectorThread;
+
+ @Override
+ public String parseArgs(String s) {
+ String[] tokens = s.split(" ");
+ if (tokens.length == 2) {
+ uri = tokens[0];
+ try {
+ periodicity = Integer.parseInt(tokens[1]);
+ } catch (NumberFormatException e) {
+ log.warn("OozieAdaptor: incorrect argument for period. Expecting number");
+ return null;
+ }
+ } else {
+ log.warn("bad syntax in OozieAdaptor args");
+ return null;
+ }
+ return s;
+ }
+
+ @Override
+ public void start(long offset) throws AdaptorException {
+ sendOffset = offset;
+ init(); // initialize the configuration
+ log.info("Starting Oozie Adaptor with [ " + sendOffset + " ] offset");
+ scheduledCollectorThread = scheduler.scheduleAtFixedRate(
+ new OozieMetricsCollector(), initialDelay, periodicity,
+ TimeUnit.SECONDS);
+ log.info("scheduled");
+ }
+
+ @Override
+ public String getCurrentStatus() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append(type);
+ buffer.append(" ");
+ buffer.append(uri);
+ buffer.append(" ");
+ buffer.append(periodicity);
+ return buffer.toString();
+ }
+
+ @Override
+ public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+ throws AdaptorException {
+ scheduledCollectorThread.cancel(true);
+ scheduler.shutdown();
+ return sendOffset;
+ }
+
+ private class OozieMetricsCollector implements Runnable {
+ @Override
+ public void run() {
+ try {
+ if (isKerberosEnabled) {
+ if (UGI == null) {
+ throw new IllegalStateException("UGI Login context is null");
+ }
+
+ UGI.checkTGTAndReloginFromKeytab();
+ length = UGI.doAs(new PrivilegedExceptionAction<Integer>() {
+ @Override
+ public Integer run() throws Exception {
+ return processMetrics();
+ }
+ });
+
+ } else {
+ length = processMetrics();
+ }
+
+ if (length <= 0) {
+ log.warn("Oozie is either not responding or sending zero payload");
+ } else {
+ log.info("Processing a oozie instrumentation payload of [" + length
+ + "] bytes");
+ }
+ } catch (Exception e) {
+ log.error(ExceptionUtil.getStackTrace(e));
+ log.error("Exception occured while getting oozie metrics " + e);
+ }
+ }
+ }
+
+ private void init() {
+ if (getChukwaConfiguration() == null) {
+ setChukwaConfiguration(ChukwaUtil.readConfiguration());
+ }
+ String authType = getChukwaConfiguration().get(
+ "chukwaAgent.hadoop.authentication.type");
+ if (authType != null && authType.equalsIgnoreCase("kerberos")) {
+ login(); // get the UGI context
+ isKerberosEnabled = true;
+ }
+ }
+
+ private void login() {
+ try {
+ String principalConfig = getChukwaConfiguration().get(
+ "chukwaAgent.hadoop.authentication.kerberos.principal",
+ System.getProperty("user.name"));
+ String hostname = null;
+ String principalName = SecurityUtil.getServerPrincipal(principalConfig,
+ hostname);
+ UGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
+ principalName,
+ getChukwaConfiguration().get(
+ "chukwaAgent.hadoop.authentication.kerberos.keytab"));
+ } catch (IOException e) {
+ log.error(ExceptionUtil.getStackTrace(e));
+ }
+ }
+
+ private int processMetrics() {
+ return addChunkToReceiver(getOozieMetrics().getBytes());
+ }
+
+ private String getOozieMetrics() {
+ return RestUtil.getResponseAsString(uri);
+ }
+
+ public int addChunkToReceiver(byte[] data) {
+ try {
+ sendOffset += data.length;
+ ChunkImpl c = new ChunkImpl(type, "REST", sendOffset, data, this);
+ long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+ .getTimeInMillis();
+ c.addTag("timeStamp=\"" + rightNow + "\"");
+ dest.add(c);
+ } catch (Exception e) {
+ log.error(ExceptionUtil.getStackTrace(e));
+ }
+ return data.length;
+ }
+
+ public Configuration getChukwaConfiguration() {
+ return chukwaConfiguration;
+ }
+
+ public void setChukwaConfiguration(Configuration chukwaConfiguration) {
+ this.chukwaConfiguration = chukwaConfiguration;
+ }
+}
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=1612617&r1=1612616&r2=1612617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Tue Jul 22 17:21:37 2014
@@ -49,6 +49,7 @@ import org.apache.hadoop.chukwa.datacoll
import org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector;
import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
+import org.apache.hadoop.chukwa.util.ChukwaUtil;
import org.apache.hadoop.chukwa.util.DaemonWatcher;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.conf.Configuration;
@@ -276,7 +277,7 @@ public class ChukwaAgent implements Adap
System.exit(0);
}
- conf = readConfig();
+ conf = ChukwaUtil.readConfiguration();
agent = new ChukwaAgent(conf);
if (agent.anotherAgentIsRunning()) {
@@ -736,44 +737,6 @@ public class ChukwaAgent implements Adap
return connector;
}
- private static Configuration readConfig() {
- Configuration conf = new Configuration();
-
- String chukwaHomeName = System.getenv("CHUKWA_HOME");
- if (chukwaHomeName == null) {
- chukwaHomeName = "";
- }
- File chukwaHome = new File(chukwaHomeName).getAbsoluteFile();
-
- log.info("Config - CHUKWA_HOME: [" + chukwaHome.toString() + "]");
-
- String chukwaConfName = System.getProperty("CHUKWA_CONF_DIR");
- File chukwaConf;
- if (chukwaConfName != null)
- chukwaConf = new File(chukwaConfName).getAbsoluteFile();
- else
- chukwaConf = new File(chukwaHome, "conf");
-
- log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]");
- File agentConf = new File(chukwaConf, "chukwa-agent-conf.xml");
- conf.addResource(new Path(agentConf.getAbsolutePath()));
- conf.addResource(new Path( new File(chukwaConf, "chukwa-common.xml").getAbsolutePath()));
- if (conf.get("chukwaAgent.checkpoint.dir") == null)
- conf.set("chukwaAgent.checkpoint.dir", new File(chukwaHome, "var")
- .getAbsolutePath());
- conf.set("chukwaAgent.initial_adaptors", new File(chukwaConf,
- "initial_adaptors").getAbsolutePath());
- try {
- Configuration chukwaAgentConf = new Configuration(false);
- chukwaAgentConf.addResource(new Path(agentConf.getAbsolutePath()));
- Checker.checkConf(new OptDictionary(new File(new File(chukwaHome, "share/chukwa/lib"), "agent.dict")),
- HSlurper.fromHConf(chukwaAgentConf));
- } catch(Exception e) {
- e.printStackTrace();
- }
- return conf;
- }
-
public void shutdown() {
shutdown(false);
}
Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/ChukwaUtil.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/ChukwaUtil.java?rev=1612617&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/ChukwaUtil.java (added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/ChukwaUtil.java Tue Jul 22 17:21:37 2014
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import edu.berkeley.confspell.Checker;
+import edu.berkeley.confspell.HSlurper;
+import edu.berkeley.confspell.OptDictionary;
+
+/*
+ * Create a common set of utility classes for code reuse
+ */
+
+public class ChukwaUtil {
+
+ private static Logger log = Logger.getLogger(ChukwaUtil.class);
+
+ public static Configuration readConfiguration() {
+ Configuration conf = new Configuration();
+
+ String chukwaHomeName = System.getenv("CHUKWA_HOME");
+ if (chukwaHomeName == null) {
+ chukwaHomeName = "";
+ }
+ File chukwaHome = new File(chukwaHomeName).getAbsoluteFile();
+
+ log.info("Config - CHUKWA_HOME: [" + chukwaHome.toString() + "]");
+
+ String chukwaConfName = System.getProperty("CHUKWA_CONF_DIR");
+ File chukwaConf;
+ if (chukwaConfName != null)
+ chukwaConf = new File(chukwaConfName).getAbsoluteFile();
+ else
+ chukwaConf = new File(chukwaHome, "conf");
+
+ log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]");
+ File agentConf = new File(chukwaConf, "chukwa-agent-conf.xml");
+ conf.addResource(new Path(agentConf.getAbsolutePath()));
+ if (conf.get("chukwaAgent.checkpoint.dir") == null)
+ conf.set("chukwaAgent.checkpoint.dir",
+ new File(chukwaHome, "var").getAbsolutePath());
+ conf.set("chukwaAgent.initial_adaptors", new File(chukwaConf,
+ "initial_adaptors").getAbsolutePath());
+ try {
+ Configuration chukwaAgentConf = new Configuration(false);
+ chukwaAgentConf.addResource(new Path(agentConf.getAbsolutePath()));
+ Checker.checkConf(new OptDictionary(new File(new File(chukwaHome,
+ "share/chukwa/lib"), "agent.dict")), HSlurper
+ .fromHConf(chukwaAgentConf));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return conf;
+ }
+
+}
Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/RestUtil.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/RestUtil.java?rev=1612617&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/RestUtil.java (added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/RestUtil.java Tue Jul 22 17:21:37 2014
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.log4j.Logger;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.WebResource;
+
+/* This should contain set of helper methods to convert the
+ * response returned from any web servers to required formats
+ * This can be modified to accept different headers based on the
+ * file formats.
+ */
+public class RestUtil {
+
+ private static WebResource webResource;
+ private static Client webClient;
+ private static Logger log = Logger.getLogger(RestUtil.class);
+
+ public static String getResponseAsString(String URI) {
+ if (URI == null) {
+ throw new IllegalStateException("URI cannot be blank");
+ }
+
+ String response = null;
+ webClient = Client.create();
+ try {
+ webResource = webClient.resource(URI);
+ response = webResource.accept(MediaType.APPLICATION_JSON_TYPE).get(
+ String.class);
+ } catch (ClientHandlerException e) {
+ Throwable t = e.getCause();
+ if (t instanceof java.net.ConnectException) {
+ log.warn("Connect exception trying to connect to [" + URI
+ + "]. Make sure the service is running");
+ } else {
+ log.error(ExceptionUtil.getStackTrace(e));
+ }
+ } finally {
+ if (webClient != null) {
+ webClient.destroy();
+ }
+ }
+ return response;
+ }
+
+}
\ No newline at end of file
Added: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestOozieAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestOozieAdaptor.java?rev=1612617&view=auto
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestOozieAdaptor.java (added)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestOozieAdaptor.java Tue Jul 22 17:21:37 2014
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
+import org.json.simple.JSONObject;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestOozieAdaptor extends TestCase implements ChunkReceiver {
+
+ volatile boolean receivedOK = false;
+ public String str = null;
+
+ @Test
+ public void testMessageReceivedOk() throws Exception {
+ OozieAdaptor oozieAdaptor = new OozieAdaptor();
+
+ oozieAdaptor.parseArgs("TestOozieAdaptor", "0", AdaptorManager.NULL);
+ oozieAdaptor.start("id", "TestOozieAdaptor", 0, this);
+
+ JSONObject json = composeMessage();
+ int lengthReturned = oozieAdaptor.addChunkToReceiver(json.toString()
+ .getBytes());
+ assertEquals(84, lengthReturned); // 84 is the length of json string
+
+ synchronized (this) {
+ wait(1000);
+ }
+ assertTrue(receivedOK);
+ }
+
+ @SuppressWarnings("unchecked")
+ private JSONObject composeMessage() {
+ JSONObject json = new JSONObject();
+ json.put("oozie.jvm.used.memory", 10);
+ json.put("oozie.jvm.free.memory", 90);
+ json.put("oozie.jvm.total.memory", 100);
+ str = json.toString();
+ return json;
+ }
+
+ @Override
+ public void add(Chunk C) throws InterruptedException {
+ assertTrue(C.getDataType().equals("TestOozieAdaptor"));
+ assertEquals(C.getSeqID(), C.getData().length);
+ byte[] data = C.getData();
+ String s = new String(data);
+
+ assertTrue(str.equals(s));
+
+ receivedOK = true;
+ synchronized (this) {
+ notify();
+ }
+ }
+}