You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2014/07/22 18:28:21 UTC
svn commit: r1612604 - in /chukwa/trunk: CHANGES.txt
src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java
src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestRestAdaptor.java
Author: eyang
Date: Tue Jul 22 16:28:21 2014
New Revision: 1612604
URL: http://svn.apache.org/r1612604
Log:
CHUKWA-712. Implemented generic REST Adaptor for fetch data from web service. (Sreepathi Prasanna via Eric Yang)
Added:
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java
chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestRestAdaptor.java
Modified:
chukwa/trunk/CHANGES.txt
Modified: chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/chukwa/trunk/CHANGES.txt?rev=1612604&r1=1612603&r2=1612604&view=diff
==============================================================================
--- chukwa/trunk/CHANGES.txt (original)
+++ chukwa/trunk/CHANGES.txt Tue Jul 22 16:28:21 2014
@@ -12,6 +12,8 @@ Release 0.6 - Unreleased
NEW FEATURES
+ CHUKWA-712. Implemented generic REST Adaptor for fetch data from web service. (Sreepathi Prasanna via Eric Yang)
+
CHUKWA-674. Integrated Chukwa collector feature to Chukwa Agent. (Eric Yang)
CHUKWA-705. Updated Chukwa to support JDK7 and updated to Hadoop 1.2.1 and HBase 0.96.1.1. (Eric Yang)
Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java?rev=1612604&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java (added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/RestAdaptor.java Tue Jul 22 16:28:21 2014
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.util.Calendar;
+import java.util.TimeZone;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.WebResource;
+
+import javax.ws.rs.core.MediaType;
+
+public class RestAdaptor extends AbstractAdaptor {
+
+ private String uri;
+ private long period = 60;
+ private static Logger log = Logger.getLogger(RestAdaptor.class);
+ private WebResource resource;
+ private Client c;
+ private String bean;
+ private Timer timer;
+ private TimerTask runner;
+ private long sendOffset;
+
+ class RestTimer extends TimerTask {
+
+ private ChunkReceiver receiver;
+ private RestAdaptor adaptor;
+
+ RestTimer(ChunkReceiver receiver, RestAdaptor adaptor) {
+ this.receiver = receiver;
+ this.adaptor = adaptor;
+ }
+
+ @Override
+ public void run() {
+ try {
+ resource = c.resource(uri);
+ bean = resource.accept(MediaType.APPLICATION_JSON_TYPE).get(
+ String.class);
+ byte[] data = bean.getBytes();
+ sendOffset += data.length;
+ ChunkImpl c = new ChunkImpl(type, "REST", sendOffset, data, adaptor);
+ long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+ .getTimeInMillis();
+ c.addTag("timeStamp=\"" + rightNow + "\"");
+ receiver.add(c);
+ } catch (com.sun.jersey.api.client.ClientHandlerException e) {
+ Throwable t = e.getCause();
+ if (t instanceof java.net.ConnectException) {
+ log.warn("Connect exception trying to connect to " + uri
+ + ". Make sure the service is running");
+ } else {
+ log.error("RestAdaptor: Interrupted exception");
+ log.error(ExceptionUtil.getStackTrace(e));
+ }
+ } catch (Exception e) {
+ log.error("RestAdaptor: Interrupted exception");
+ log.error(ExceptionUtil.getStackTrace(e));
+ }
+ }
+ }
+
+ @Override
+ public String getCurrentStatus() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append(type);
+ buffer.append(" ");
+ buffer.append(uri);
+ buffer.append(" ");
+ buffer.append(period);
+ return buffer.toString();
+ }
+
+ @Override
+ public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+ throws AdaptorException {
+ timer.cancel();
+ return sendOffset;
+ }
+
+ @Override
+ public void start(long offset) throws AdaptorException {
+ sendOffset = offset;
+ if (timer == null) {
+ timer = new Timer();
+ runner = new RestTimer(dest, RestAdaptor.this);
+ }
+ timer.scheduleAtFixedRate(runner, 0, period * 1000);
+ }
+
+ @Override
+ public String parseArgs(String s) {
+ // RestAdaptor [Host] port uri [interval]
+ String[] tokens = s.split(" ");
+ if (tokens.length == 2) {
+ uri = tokens[0];
+ try {
+ period = Integer.parseInt(tokens[1]);
+ } catch (NumberFormatException e) {
+ log.warn("RestAdaptor: incorrect argument for period. Expecting number");
+ return null;
+ }
+ } else {
+ log.warn("bad syntax in RestAdaptor args");
+ return null;
+ }
+ c = Client.create();
+ return s;
+ }
+
+}
Added: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestRestAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestRestAdaptor.java?rev=1612604&view=auto
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestRestAdaptor.java (added)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestRestAdaptor.java Tue Jul 22 16:28:21 2014
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.json.simple.JSONObject;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+
+public class TestRestAdaptor extends TestCase implements ChunkReceiver {
+
+ private Server jettyServer = null;
+ private JSONObject metricsMap = new JSONObject();
+ private static String args = "http://localhost:9090/metrics/instrumentation/data 2";
+ private static long offset = 0;
+
+ @Before
+ public void setUp() throws Exception {
+ metricsMap.put("FreeSpace", "10GB");
+ metricsMap.put("UsedSpace", "90GB");
+ metricsMap.put("maps_killed", "20");
+
+ jettyServer = new Server(9090);
+ Context root = new Context(jettyServer, "/metrics/instrumentation/data",
+ Context.SESSIONS);
+ root.addServlet(new ServletHolder(new RestServlet()), "/*");
+ System.out.println(" Rest Server starting..");
+
+ jettyServer.start();
+ jettyServer.setStopAtShutdown(true);
+ }
+
+ @Test
+ public void testMessageReceivedOk() throws Exception {
+ RestAdaptor restAdaptor = new RestAdaptor();
+ restAdaptor.parseArgs(args);
+ restAdaptor.start("id", "TestRestAdaptor", 0, this);
+ Thread.sleep(2000); // wait for processing
+ }
+
+ @Override
+ public void add(Chunk event) throws InterruptedException {
+ offset += event.getData().length;
+ assertTrue(event.getDataType().equals("TestRestAdaptor"));
+ assertEquals(event.getSeqID(), offset);
+ assertTrue(metricsMap.toString().equals(new String(event.getData())));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (jettyServer != null) {
+ jettyServer.stop();
+ }
+ }
+
+ private class RestServlet extends HttpServlet {
+ private static final long serialVersionUID = -8007387020169769539L;
+
+ protected void doGet(HttpServletRequest request,
+ HttpServletResponse response) throws ServletException, IOException {
+ response.getWriter().write(metricsMap.toString());
+ }
+ }
+}