You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2014/07/22 18:28:21 UTC

svn commit: r1612604 - in /chukwa/trunk: CHANGES.txt src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestRestAdaptor.java

Author: eyang
Date: Tue Jul 22 16:28:21 2014
New Revision: 1612604

URL: http://svn.apache.org/r1612604
Log:
CHUKWA-712. Implemented generic REST Adaptor for fetch data from web service.  (Sreepathi Prasanna via Eric Yang)

Added:
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestRestAdaptor.java
Modified:
    chukwa/trunk/CHANGES.txt

Modified: chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/chukwa/trunk/CHANGES.txt?rev=1612604&r1=1612603&r2=1612604&view=diff
==============================================================================
--- chukwa/trunk/CHANGES.txt (original)
+++ chukwa/trunk/CHANGES.txt Tue Jul 22 16:28:21 2014
@@ -12,6 +12,8 @@ Release 0.6 - Unreleased
 
   NEW FEATURES
 
+    CHUKWA-712. Implemented generic REST Adaptor for fetch data from web service.  (Sreepathi Prasanna via Eric Yang)
+
     CHUKWA-674. Integrated Chukwa collector feature to Chukwa Agent.  (Eric Yang)
 
     CHUKWA-705. Updated Chukwa to support JDK7 and updated to Hadoop 1.2.1 and HBase 0.96.1.1.  (Eric Yang)

Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java?rev=1612604&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java (added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java Tue Jul 22 16:28:21 2014
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.util.Calendar;
+import java.util.TimeZone;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.WebResource;
+
+import javax.ws.rs.core.MediaType;
+
+public class RestAdaptor extends AbstractAdaptor {
+
+  private String uri;
+  private long period = 60;
+  private static Logger log = Logger.getLogger(RestAdaptor.class);
+  private WebResource resource;
+  private Client c;
+  private String bean;
+  private Timer timer;
+  private TimerTask runner;
+  private long sendOffset;
+
+  class RestTimer extends TimerTask {
+
+    private ChunkReceiver receiver;
+    private RestAdaptor adaptor;
+
+    RestTimer(ChunkReceiver receiver, RestAdaptor adaptor) {
+      this.receiver = receiver;
+      this.adaptor = adaptor;
+    }
+
+    @Override
+    public void run() {
+      try {
+        resource = c.resource(uri);
+        bean = resource.accept(MediaType.APPLICATION_JSON_TYPE).get(
+            String.class);
+        byte[] data = bean.getBytes();
+        sendOffset += data.length;
+        ChunkImpl c = new ChunkImpl(type, "REST", sendOffset, data, adaptor);
+        long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+            .getTimeInMillis();
+        c.addTag("timeStamp=\"" + rightNow + "\"");
+        receiver.add(c);
+      } catch (com.sun.jersey.api.client.ClientHandlerException e) {
+        Throwable t = e.getCause();
+        if (t instanceof java.net.ConnectException) {
+          log.warn("Connect exception trying to connect to " + uri
+              + ". Make sure the service is running");
+        } else {
+          log.error("RestAdaptor: Interrupted exception");
+          log.error(ExceptionUtil.getStackTrace(e));
+        }
+      } catch (Exception e) {
+        log.error("RestAdaptor: Interrupted exception");
+        log.error(ExceptionUtil.getStackTrace(e));
+      }
+    }
+  }
+
+  @Override
+  public String getCurrentStatus() {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append(type);
+    buffer.append(" ");
+    buffer.append(uri);
+    buffer.append(" ");
+    buffer.append(period);
+    return buffer.toString();
+  }
+
+  @Override
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+      throws AdaptorException {
+    timer.cancel();
+    return sendOffset;
+  }
+
+  @Override
+  public void start(long offset) throws AdaptorException {
+    sendOffset = offset;
+    if (timer == null) {
+      timer = new Timer();
+      runner = new RestTimer(dest, RestAdaptor.this);
+    }
+    timer.scheduleAtFixedRate(runner, 0, period * 1000);
+  }
+
+  @Override
+  public String parseArgs(String s) {
+    // RestAdaptor [Host] port uri [interval]
+    String[] tokens = s.split(" ");
+    if (tokens.length == 2) {
+      uri = tokens[0];
+      try {
+        period = Integer.parseInt(tokens[1]);
+      } catch (NumberFormatException e) {
+        log.warn("RestAdaptor: incorrect argument for period. Expecting number");
+        return null;
+      }
+    } else {
+      log.warn("bad syntax in RestAdaptor args");
+      return null;
+    }
+    c = Client.create();
+    return s;
+  }
+
+}

Added: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestRestAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestRestAdaptor.java?rev=1612604&view=auto
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestRestAdaptor.java (added)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestRestAdaptor.java Tue Jul 22 16:28:21 2014
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.json.simple.JSONObject;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+
+public class TestRestAdaptor extends TestCase implements ChunkReceiver {
+
+  private Server jettyServer = null;
+  private JSONObject metricsMap = new JSONObject();
+  private static String args = "http://localhost:9090/metrics/instrumentation/data 2";
+  private static long offset = 0;
+
+  @Before
+  public void setUp() throws Exception {
+    metricsMap.put("FreeSpace", "10GB");
+    metricsMap.put("UsedSpace", "90GB");
+    metricsMap.put("maps_killed", "20");
+
+    jettyServer = new Server(9090);
+    Context root = new Context(jettyServer, "/metrics/instrumentation/data",
+        Context.SESSIONS);
+    root.addServlet(new ServletHolder(new RestServlet()), "/*");
+    System.out.println(" Rest Server starting..");
+
+    jettyServer.start();
+    jettyServer.setStopAtShutdown(true);
+  }
+
+  @Test
+  public void testMessageReceivedOk() throws Exception {
+    RestAdaptor restAdaptor = new RestAdaptor();
+    restAdaptor.parseArgs(args);
+    restAdaptor.start("id", "TestRestAdaptor", 0, this);
+    Thread.sleep(2000); // wait for processing
+  }
+
+  @Override
+  public void add(Chunk event) throws InterruptedException {
+    offset += event.getData().length;
+    assertTrue(event.getDataType().equals("TestRestAdaptor"));
+    assertEquals(event.getSeqID(), offset);
+    assertTrue(metricsMap.toString().equals(new String(event.getData())));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (jettyServer != null) {
+      jettyServer.stop();
+    }
+  }
+
+  private class RestServlet extends HttpServlet {
+    private static final long serialVersionUID = -8007387020169769539L;
+
+    protected void doGet(HttpServletRequest request,
+        HttpServletResponse response) throws ServletException, IOException {
+      response.getWriter().write(metricsMap.toString());
+    }
+  }
+}