You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2010/04/29 19:26:23 UTC

svn commit: r939394 - in /hadoop/chukwa/trunk: CHANGES.txt src/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java

Author: asrabkin
Date: Thu Apr 29 17:26:23 2010
New Revision: 939394

URL: http://svn.apache.org/viewvc?rev=939394&view=rev
Log:
CHUKWA-479. Support HTTP trigger actions. Contributed by Bill Graham.

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=939394&r1=939393&r2=939394&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Thu Apr 29 17:26:23 2010
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    CHUKWA-479.  Support HTTP trigger actions (Bill Graham via asrabkin)
+
     CHUKWA-469. Add JMSAdaptor. (Bill Graham via asrabkin)
 
     CHUKWA-477. Support post-demux trigger.  (Bill Graham via Eric Yang)

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java?rev=939394&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java Thu Apr 29 17:26:23 2010
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datatrigger;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.BufferedReader;
+import java.io.OutputStreamWriter;
+import java.net.URL;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * Trigger action that makes an HTTP request when executed.
+ * <P>
+ * To use this trigger, two types of configurations must be set. First, this class
+ * must be configured to be invoked for a given trigger event. Second, the
+ * the relevant settings for the HTTP request(s) to be made must be set as
+ * described below.
+ * <P>
+ * The general format of this classes configs is
+ * <code>chukwa.trigger.[eventName].http.[N].[paramName]</code> where
+ * <code>eventName</code> is the name of the event the request values are bound
+ * to (see TriggerEvent), <code>N</code> is a counter for each request configured (starting at 1)
+ * and <code>paramName</code> is the request parameter being set.
+ * <P>
+ * Using the post demux success trigger event as an example, the first request
+ * to be fired would use the following configurations
+ * <ul>
+ * <li><code>chukwa.trigger.post.demux.success.http.1.url</code> - The HTTP url to
+ * invoke.</li>
+ * <li><code>chukwa.trigger.post.demux.success.http.1.method</code> - The HTTP method
+ * (optional, default=GET).</li>
+ * <li><code>chukwa.trigger.post.demux.success.http.1.headers</code> - A comma-delimited
+ * set of HTTP headers (in <code>[headerName]:[headerValue]</code> form) to
+ * include (optional).</li>
+ * <li><code>chukwa.trigger.post.demux.success.http.1.body</code> - The text HTTP body
+ * to include (optional).</li>
+ * <li><code>chukwa.trigger.post.demux.success.http.1.connect.timeout</code> - The
+ * HTTP connection timeout setting in milliseconds (optional, default=5000ms).</li>
+ * <li><code>chukwa.trigger.post.demux.success.http.1.read.timeout</code> - The
+ * HTTP read timeout setting in milliseconds (optional, default=5000ms).</li>
+ * </ul>
+ * @see TriggerAction
+ * @see TriggerEvent
+ */
+public class HttpTriggerAction implements TriggerAction {
+  protected Log log = LogFactory.getLog(getClass());
+
+
+  /**
+   * Iterates over each URL found, fetched other settings and fires and HTTP
+   * request.
+   *
+   * @param conf
+   * @param fs
+   * @param src
+   * @param triggerEvent
+   * @throws IOException
+   */
+  public void execute(Configuration conf, FileSystem fs,
+                      FileStatus[] src, TriggerEvent triggerEvent) throws IOException {
+
+    if (log.isDebugEnabled()) {
+      for (FileStatus file : src) {
+          log.debug("Execute file: " + file.getPath());
+      }
+    }
+
+    int reqNumber = 1;
+    URL url = null;
+    while ((url = getUrl(conf, triggerEvent, reqNumber)) != null) {
+
+      // get settings for this request
+      String method = getMethod(conf, triggerEvent, reqNumber);
+      Map<String, String> headers = getHeaders(conf, triggerEvent, reqNumber);
+      String body = getBody(conf, triggerEvent, reqNumber);
+      int connectTimeout = getConnectTimeout(conf, triggerEvent, reqNumber);
+      int readTimeout = getReadTimeout(conf, triggerEvent, reqNumber);
+
+      try {
+        // make the request
+        makeHttpRequest(url, method, headers, body, connectTimeout, readTimeout);
+      }
+      catch(Exception e) {
+        log.error("Error making request to " + url, e);
+      }
+      reqNumber++;
+    }
+  }
+
+  private void makeHttpRequest(URL url, String method,
+                               Map<String, String> headers, String body,
+                               int connectTimeout, int readTimeout) throws IOException {
+    if (url == null) {
+      return;
+    }
+
+    // initialize the connection
+    HttpURLConnection conn = (HttpURLConnection)url.openConnection();
+    conn.setRequestMethod(method);
+    conn.setDoInput(true);
+    conn.setConnectTimeout(connectTimeout);
+    conn.setReadTimeout(readTimeout);
+
+    // set headers
+    boolean contentLengthExists = false;
+    if (headers != null) {
+      for (String name: headers.keySet()) {
+        if (log.isDebugEnabled()) {
+          log.debug("Setting header " + name + ": " + headers.get(name));
+        }
+        if (name.equalsIgnoreCase("content-length")) {
+          contentLengthExists = true;
+        }
+        conn.setRequestProperty(name, headers.get(name));
+      }
+    }
+
+    // set content-length if not already set
+    if (!"GET".equals(method) && !contentLengthExists) {
+      String contentLength = body != null ? String.valueOf(body.length()) : "0";
+      conn.setRequestProperty("Content-Length", contentLength);
+    }
+
+    // send body if it exists
+    if (body != null) {
+      conn.setDoOutput(true);
+      OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream());
+      writer.write(body);
+      writer.flush();
+      writer.close();
+    }
+    else {
+      conn.setDoOutput(false);
+    }
+
+    // read reponse code/message and dump response
+    log.info("Making HTTP " + method + " to: " + url);
+    int responseCode = conn.getResponseCode();
+    log.info("HTTP Response code: " + responseCode);
+
+
+    if (responseCode != 200) {
+      log.info("HTTP Response message: " + conn.getResponseMessage());
+    }
+    else {
+      BufferedReader reader = new BufferedReader(
+                                new InputStreamReader(conn.getInputStream()));
+      String line;
+      StringBuilder sb = new StringBuilder();
+      while ((line = reader.readLine()) != null) {
+        if(sb.length() > 0) {
+          sb.append("\n");
+        }
+        sb.append(line);
+      }
+      log.info("HTTP Response:\n" + sb);
+
+      reader.close();
+    }
+
+    conn.disconnect();
+  }
+
+  protected URL getUrl(Configuration conf,
+                       TriggerEvent triggerEvent,
+                       int reqNumber) throws MalformedURLException {
+    String urlString = conf.get(getConfigKey(triggerEvent, reqNumber, "url"), null);
+    if (urlString == null) {
+      return null;
+    }
+
+    return new URL(urlString);
+  }
+
+  protected String getMethod(Configuration conf,
+                             TriggerEvent triggerEvent,
+                             int reqNumber) {
+    return conf.get(getConfigKey(triggerEvent, reqNumber, "method"), "GET");
+  }
+
+  protected Map<String, String> getHeaders(Configuration conf,
+                                           TriggerEvent triggerEvent,
+                                           int reqNumber) {
+    Map<String, String> headerMap = new HashMap<String,String>();
+
+    String headers = conf.get(getConfigKey(triggerEvent, reqNumber, "headers"), null);
+
+    if (headers != null) {
+      String[] headersSplit = headers.split(",");
+      for (String header : headersSplit) {
+        String[] nvp = header.split(":", 2);
+        if (nvp.length < 2) {
+          log.error("Invalid HTTP header found: " + nvp);
+          continue;
+        }
+        headerMap.put(nvp[0].trim(), nvp[1].trim());
+      }
+    }
+
+    return headerMap;
+  }
+
+  protected String getBody(Configuration conf,
+                           TriggerEvent triggerEvent,
+                           int reqNumber) {
+    return conf.get(getConfigKey(triggerEvent, reqNumber, "body"), "GET");
+  }
+
+  protected int getConnectTimeout(Configuration conf,
+                                 TriggerEvent triggerEvent,
+                                 int reqNumber) {
+    String timeout = conf.get(getConfigKey(triggerEvent, reqNumber, "connect.timeout"), null);
+    return timeout != null ? Integer.parseInt(timeout) : 5000;
+  }
+
+  
+  protected int getReadTimeout(Configuration conf,
+                              TriggerEvent triggerEvent,
+                              int reqNumber) {
+    String timeout = conf.get(getConfigKey(triggerEvent, reqNumber, "read.timeout"), null);
+    return timeout != null ? Integer.parseInt(timeout) : 5000;
+  }
+
+  private String getConfigKey(TriggerEvent triggerEvent, int reqNumber, String name) {
+    return triggerEvent.getConfigKeyBase() + ".http." + reqNumber + "." + name;
+  }
+}