You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2014/07/22 19:21:37 UTC

svn commit: r1612617 - in /chukwa/trunk: ./ src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ src/main/java/org/apache/hadoop/chukwa/util/ src/test/java/org/apache/hadoop/chukwa/...

Author: eyang
Date: Tue Jul 22 17:21:37 2014
New Revision: 1612617

URL: http://svn.apache.org/r1612617
Log:
CHUKWA-715. Added Oozie Adaptor for collecting Oozie metrics.  (Sreepathi Prasanna via Eric Yang)

Added:
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/ChukwaUtil.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/RestUtil.java
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestOozieAdaptor.java
Modified:
    chukwa/trunk/CHANGES.txt
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java

Modified: chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/chukwa/trunk/CHANGES.txt?rev=1612617&r1=1612616&r2=1612617&view=diff
==============================================================================
--- chukwa/trunk/CHANGES.txt (original)
+++ chukwa/trunk/CHANGES.txt Tue Jul 22 17:21:37 2014
@@ -12,6 +12,8 @@ Release 0.6 - Unreleased
 
   NEW FEATURES
 
+    CHUKWA-715. Added Oozie Adaptor for collecting Oozie metrics.  (Sreepathi Prasanna via Eric Yang)
+
     CHUKWA-712. Implemented generic REST Adaptor for fetch data from web service.  (Sreepathi Prasanna via Eric Yang)
 
     CHUKWA-674. Integrated Chukwa collector feature to Chukwa Agent.  (Eric Yang)

Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java?rev=1612617&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java (added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java Tue Jul 22 17:21:37 2014
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Calendar;
+import java.util.TimeZone;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.util.ChukwaUtil;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.hadoop.chukwa.util.RestUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Logger;
+
+public class OozieAdaptor extends AbstractAdaptor {
+
+  private static Logger log = Logger.getLogger(OozieAdaptor.class);
+  private String uri;
+
+  private long sendOffset;
+  private Configuration chukwaConfiguration = null;
+  private static UserGroupInformation UGI = null;
+  private boolean isKerberosEnabled = false;
+  private int length = 0;
+  private final ScheduledExecutorService scheduler = Executors
+      .newScheduledThreadPool(1);
+  private static final long initialDelay = 60; // seconds
+  private static long periodicity = 60; // seconds
+  private ScheduledFuture<?> scheduledCollectorThread;
+
+  @Override
+  public String parseArgs(String s) {
+    String[] tokens = s.split(" ");
+    if (tokens.length == 2) {
+      uri = tokens[0];
+      try {
+        periodicity = Integer.parseInt(tokens[1]);
+      } catch (NumberFormatException e) {
+        log.warn("OozieAdaptor: incorrect argument for period. Expecting number");
+        return null;
+      }
+    } else {
+      log.warn("bad syntax in OozieAdaptor args");
+      return null;
+    }
+    return s;
+  }
+
+  @Override
+  public void start(long offset) throws AdaptorException {
+    sendOffset = offset;
+    init(); // initialize the configuration
+    log.info("Starting Oozie Adaptor with [ " + sendOffset + " ] offset");
+    scheduledCollectorThread = scheduler.scheduleAtFixedRate(
+        new OozieMetricsCollector(), initialDelay, periodicity,
+        TimeUnit.SECONDS);
+    log.info("scheduled");
+  }
+
+  @Override
+  public String getCurrentStatus() {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append(type);
+    buffer.append(" ");
+    buffer.append(uri);
+    buffer.append(" ");
+    buffer.append(periodicity);
+    return buffer.toString();
+  }
+
+  @Override
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+      throws AdaptorException {
+    scheduledCollectorThread.cancel(true);
+    scheduler.shutdown();
+    return sendOffset;
+  }
+
+  private class OozieMetricsCollector implements Runnable {
+    @Override
+    public void run() {
+      try {
+        if (isKerberosEnabled) {
+          if (UGI == null) {
+            throw new IllegalStateException("UGI Login context is null");
+          }
+
+          UGI.checkTGTAndReloginFromKeytab();
+          length = UGI.doAs(new PrivilegedExceptionAction<Integer>() {
+            @Override
+            public Integer run() throws Exception {
+              return processMetrics();
+            }
+          });
+
+        } else {
+          length = processMetrics();
+        }
+
+        if (length <= 0) {
+          log.warn("Oozie is either not responding or sending zero payload");
+        } else {
+          log.info("Processing a oozie instrumentation payload of [" + length
+              + "] bytes");
+        }
+      } catch (Exception e) {
+        log.error(ExceptionUtil.getStackTrace(e));
+        log.error("Exception occured while getting oozie metrics " + e);
+      }
+    }
+  }
+
+  private void init() {
+    if (getChukwaConfiguration() == null) {
+      setChukwaConfiguration(ChukwaUtil.readConfiguration());
+    }
+    String authType = getChukwaConfiguration().get(
+        "chukwaAgent.hadoop.authentication.type");
+    if (authType != null && authType.equalsIgnoreCase("kerberos")) {
+      login(); // get the UGI context
+      isKerberosEnabled = true;
+    }
+  }
+
+  private void login() {
+    try {
+      String principalConfig = getChukwaConfiguration().get(
+          "chukwaAgent.hadoop.authentication.kerberos.principal",
+          System.getProperty("user.name"));
+      String hostname = null;
+      String principalName = SecurityUtil.getServerPrincipal(principalConfig,
+          hostname);
+      UGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
+          principalName,
+          getChukwaConfiguration().get(
+              "chukwaAgent.hadoop.authentication.kerberos.keytab"));
+    } catch (IOException e) {
+      log.error(ExceptionUtil.getStackTrace(e));
+    }
+  }
+
+  private int processMetrics() {
+    return addChunkToReceiver(getOozieMetrics().getBytes());
+  }
+
+  private String getOozieMetrics() {
+    return RestUtil.getResponseAsString(uri);
+  }
+
+  public int addChunkToReceiver(byte[] data) {
+    try {
+      sendOffset += data.length;
+      ChunkImpl c = new ChunkImpl(type, "REST", sendOffset, data, this);
+      long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+          .getTimeInMillis();
+      c.addTag("timeStamp=\"" + rightNow + "\"");
+      dest.add(c);
+    } catch (Exception e) {
+      log.error(ExceptionUtil.getStackTrace(e));
+    }
+    return data.length;
+  }
+
+  public Configuration getChukwaConfiguration() {
+    return chukwaConfiguration;
+  }
+
+  public void setChukwaConfiguration(Configuration chukwaConfiguration) {
+    this.chukwaConfiguration = chukwaConfiguration;
+  }
+}

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=1612617&r1=1612616&r2=1612617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Tue Jul 22 17:21:37 2014
@@ -49,6 +49,7 @@ import org.apache.hadoop.chukwa.datacoll
 import org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector;
 import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
 import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
+import org.apache.hadoop.chukwa.util.ChukwaUtil;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.conf.Configuration;
@@ -276,7 +277,7 @@ public class ChukwaAgent implements Adap
         System.exit(0);
       }
 
-      conf = readConfig();
+      conf = ChukwaUtil.readConfiguration();
       agent = new ChukwaAgent(conf);
       
       if (agent.anotherAgentIsRunning()) {
@@ -736,44 +737,6 @@ public class ChukwaAgent implements Adap
     return connector;
   }
 
-  private static Configuration readConfig() {
-    Configuration conf = new Configuration();
-
-    String chukwaHomeName = System.getenv("CHUKWA_HOME");
-    if (chukwaHomeName == null) {
-      chukwaHomeName = "";
-    }
-    File chukwaHome = new File(chukwaHomeName).getAbsoluteFile();
-
-    log.info("Config - CHUKWA_HOME: [" + chukwaHome.toString() + "]");
-
-    String chukwaConfName = System.getProperty("CHUKWA_CONF_DIR");
-    File chukwaConf;
-    if (chukwaConfName != null)
-      chukwaConf = new File(chukwaConfName).getAbsoluteFile();
-    else
-      chukwaConf = new File(chukwaHome, "conf");
-
-    log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]");
-    File agentConf = new File(chukwaConf, "chukwa-agent-conf.xml");
-    conf.addResource(new Path(agentConf.getAbsolutePath()));
-    conf.addResource(new Path( new File(chukwaConf, "chukwa-common.xml").getAbsolutePath()));
-    if (conf.get("chukwaAgent.checkpoint.dir") == null)
-      conf.set("chukwaAgent.checkpoint.dir", new File(chukwaHome, "var")
-          .getAbsolutePath());
-    conf.set("chukwaAgent.initial_adaptors", new File(chukwaConf,
-        "initial_adaptors").getAbsolutePath());
-    try { 
-      Configuration chukwaAgentConf = new Configuration(false);
-      chukwaAgentConf.addResource(new Path(agentConf.getAbsolutePath()));
-      Checker.checkConf(new OptDictionary(new File(new File(chukwaHome, "share/chukwa/lib"), "agent.dict")),
-          HSlurper.fromHConf(chukwaAgentConf));
-    } catch(Exception e) {
-      e.printStackTrace();
-    }    
-    return conf;
-  }
-
   public void shutdown() {
     shutdown(false);
   }

Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/ChukwaUtil.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/ChukwaUtil.java?rev=1612617&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/ChukwaUtil.java (added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/ChukwaUtil.java Tue Jul 22 17:21:37 2014
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import edu.berkeley.confspell.Checker;
+import edu.berkeley.confspell.HSlurper;
+import edu.berkeley.confspell.OptDictionary;
+
+/*
+ * Create a common set of utility classes for code reuse
+ */
+
+public class ChukwaUtil {
+
+  private static Logger log = Logger.getLogger(ChukwaUtil.class);
+
+  public static Configuration readConfiguration() {
+    Configuration conf = new Configuration();
+
+    String chukwaHomeName = System.getenv("CHUKWA_HOME");
+    if (chukwaHomeName == null) {
+      chukwaHomeName = "";
+    }
+    File chukwaHome = new File(chukwaHomeName).getAbsoluteFile();
+
+    log.info("Config - CHUKWA_HOME: [" + chukwaHome.toString() + "]");
+
+    String chukwaConfName = System.getProperty("CHUKWA_CONF_DIR");
+    File chukwaConf;
+    if (chukwaConfName != null)
+      chukwaConf = new File(chukwaConfName).getAbsoluteFile();
+    else
+      chukwaConf = new File(chukwaHome, "conf");
+
+    log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]");
+    File agentConf = new File(chukwaConf, "chukwa-agent-conf.xml");
+    conf.addResource(new Path(agentConf.getAbsolutePath()));
+    if (conf.get("chukwaAgent.checkpoint.dir") == null)
+      conf.set("chukwaAgent.checkpoint.dir",
+          new File(chukwaHome, "var").getAbsolutePath());
+    conf.set("chukwaAgent.initial_adaptors", new File(chukwaConf,
+        "initial_adaptors").getAbsolutePath());
+    try {
+      Configuration chukwaAgentConf = new Configuration(false);
+      chukwaAgentConf.addResource(new Path(agentConf.getAbsolutePath()));
+      Checker.checkConf(new OptDictionary(new File(new File(chukwaHome,
+          "share/chukwa/lib"), "agent.dict")), HSlurper
+          .fromHConf(chukwaAgentConf));
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    return conf;
+  }
+
+}

Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/RestUtil.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/RestUtil.java?rev=1612617&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/RestUtil.java (added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/util/RestUtil.java Tue Jul 22 17:21:37 2014
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.log4j.Logger;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.WebResource;
+
+/* This should contain set of helper methods to convert the 
+ * response returned from any web servers to required formats
+ * This can be modified to accept different headers based on the 
+ * file formats.
+ */
+public class RestUtil {
+
+  private static WebResource webResource;
+  private static Client webClient;
+  private static Logger log = Logger.getLogger(RestUtil.class);
+
+  public static String getResponseAsString(String URI) {
+    if (URI == null) {
+      throw new IllegalStateException("URI cannot be blank");
+    }
+
+    String response = null;
+    webClient = Client.create();
+    try {
+      webResource = webClient.resource(URI);
+      response = webResource.accept(MediaType.APPLICATION_JSON_TYPE).get(
+          String.class);
+    } catch (ClientHandlerException e) {
+      Throwable t = e.getCause();
+      if (t instanceof java.net.ConnectException) {
+        log.warn("Connect exception trying to connect to [" + URI
+            + "]. Make sure the service is running");
+      } else {
+        log.error(ExceptionUtil.getStackTrace(e));
+      }
+    } finally {
+      if (webClient != null) {
+        webClient.destroy();
+      }
+    }
+    return response;
+  }
+
+}
\ No newline at end of file

Added: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestOozieAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestOozieAdaptor.java?rev=1612617&view=auto
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestOozieAdaptor.java (added)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestOozieAdaptor.java Tue Jul 22 17:21:37 2014
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
+import org.json.simple.JSONObject;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestOozieAdaptor extends TestCase implements ChunkReceiver {
+
+  volatile boolean receivedOK = false;
+  public String str = null;
+
+  @Test
+  public void testMessageReceivedOk() throws Exception {
+    OozieAdaptor oozieAdaptor = new OozieAdaptor();
+
+    oozieAdaptor.parseArgs("TestOozieAdaptor", "0", AdaptorManager.NULL);
+    oozieAdaptor.start("id", "TestOozieAdaptor", 0, this);
+
+    JSONObject json = composeMessage();
+    int lengthReturned = oozieAdaptor.addChunkToReceiver(json.toString()
+        .getBytes());
+    assertEquals(84, lengthReturned); // 84 is the length of json string
+
+    synchronized (this) {
+      wait(1000);
+    }
+    assertTrue(receivedOK);
+  }
+
+  @SuppressWarnings("unchecked")
+  private JSONObject composeMessage() {
+    JSONObject json = new JSONObject();
+    json.put("oozie.jvm.used.memory", 10);
+    json.put("oozie.jvm.free.memory", 90);
+    json.put("oozie.jvm.total.memory", 100);
+    str = json.toString();
+    return json;
+  }
+
+  @Override
+  public void add(Chunk C) throws InterruptedException {
+    assertTrue(C.getDataType().equals("TestOozieAdaptor"));
+    assertEquals(C.getSeqID(), C.getData().length);
+    byte[] data = C.getData();
+    String s = new String(data);
+
+    assertTrue(str.equals(s));
+
+    receivedOK = true;
+    synchronized (this) {
+      notify();
+    }
+  }
+}