You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2012/12/18 04:00:13 UTC
svn commit: r1423264 - in /incubator/chukwa/trunk: ./ conf/
src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/
src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/
src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/
Author: asrabkin
Date: Tue Dec 18 02:59:55 2012
New Revision: 1423264
URL: http://svn.apache.org/viewvc?rev=1423264&view=rev
Log:
CHUKWA-669. JMX Adaptor. Contributed by shreyas subramanya.
Added:
incubator/chukwa/trunk/conf/jmxremote.access
incubator/chukwa/trunk/conf/jmxremote.password
incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/
incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/JMXAgent.java
incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBean.java
incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBeanImpl.java
incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/QueueSample.java
incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
Modified:
incubator/chukwa/trunk/CHANGES.txt
incubator/chukwa/trunk/pom.xml
Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1423264&r1=1423263&r2=1423264&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Tue Dec 18 02:59:55 2012
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
NEW FEATURES
+ CHUKWA-669. JMX Adaptor. (shreyas subramanya via asrabkin)
+
CHUKWA-671. Json processors for processing JMX data from Hadoop, HBase and Zookeeper. (shreyas subramanya via asrabkin)
CHUKWA-635. Collect swap usage. (Eric Yang)
Added: incubator/chukwa/trunk/conf/jmxremote.access
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/jmxremote.access?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/conf/jmxremote.access (added)
+++ incubator/chukwa/trunk/conf/jmxremote.access Tue Dec 18 02:59:55 2012
@@ -0,0 +1,2 @@
+monitorRole readonly
+controlRole readonly
Added: incubator/chukwa/trunk/conf/jmxremote.password
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/jmxremote.password?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/conf/jmxremote.password (added)
+++ incubator/chukwa/trunk/conf/jmxremote.password Tue Dec 18 02:59:55 2012
@@ -0,0 +1,2 @@
+monitorRole a72cc3b450a0addcbe8307ed98
+controlRole a72cc3b450a0addcbe8307ed98
Modified: incubator/chukwa/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/pom.xml?rev=1423264&r1=1423263&r2=1423264&view=diff
==============================================================================
--- incubator/chukwa/trunk/pom.xml (original)
+++ incubator/chukwa/trunk/pom.xml Tue Dec 18 02:59:55 2012
@@ -390,6 +390,29 @@
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.6</version>
+ <configuration>
+ <encoding>UTF-8</encoding>
+ </configuration>
+ <executions>
+ <execution>
+ <id>chmod-jmx-file</id>
+ <phase>process-resources</phase>
+ <configuration>
+ <tasks name="setup">
+ <chmod file="target/conf/jmxremote.password" perm="600" />
+ <chmod file="target/conf/jmxremote.access" perm="600" />
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.1</version>
<executions>
@@ -456,7 +479,7 @@
</goals>
<configuration>
<skip>false</skip>
- <argLine>-Xmx1024m</argLine>
+ <argLine>-Xmx1024m -Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=true -Dcom.sun.management.jmxremote.password.file=${basedir}/target/conf/jmxremote.password -Dcom.sun.management.jmxremote.access.file=${basedir}/target/conf/jmxremote.access -Dcom.sun.management.jmxremote.port=10100</argLine>
<reportsDirectory>${project.build.directory}/test-reports</reportsDirectory>
<forkMode>pertest</forkMode>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
@@ -616,9 +639,21 @@
<expandproperties/>
</filterchain>
</copy>
+ <copy file="${basedir}/conf/jmxremote.password" tofile="${test.build.dir}/conf/jmxremote.password">
+ <filterchain>
+ <expandproperties/>
+ </filterchain>
+ </copy>
+ <copy file="${basedir}/conf/jmxremote.password" tofile="${test.build.dir}/conf/jmxremote.access">
+ <filterchain>
+ <expandproperties/>
+ </filterchain>
+ </copy>
<copy file="${basedir}/src/test/resources/hbase-site.xml" tofile="${test.build.dir}/classes/hbase-site.xml"></copy>
<copy file="${basedir}/conf/log4j.properties" tofile="${test.build.dir}/conf/log4j.properties"></copy>
<copy file="${basedir}/conf/auth.conf" tofile="${test.build.dir}/conf/auth.conf"></copy>
+ <chmod file="${test.build.dir}/conf/jmxremote.password" perm="600" />
+ <chmod file="${test.build.dir}/conf/jmxremote.access" perm="600" />
</tasks>
</configuration>
<goals>
Added: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java (added)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java Tue Dec 18 02:59:55 2012
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.rmi.ConnectException;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeSet;
+
+import javax.management.Descriptor;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularType;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+
+/**
+ * Query metrics through JMX interface. <br>
+ * 1. Enable remote jmx monitoring for the target
+ * jvm by specifying -Dcom.sun.management.jmxremote.port=jmx_port<br>
+ * 2. Enable authentication with -Dcom.sun.management.jmxremote.authenticate=true <br>
+ * -Dcom.sun.management.jmxremote.password.file=${CHUKWA_CONF_DIR}/jmxremote.password <br>
+ * -Dcom.sun.management.jmxremote.access.file=${CHUKWA_CONF_DIR}/jmxremote.access <br>
+ * 3. Optionally specify these jvm options <br>
+ * -Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote.ssl=false <br>
+ * 4. Connect to the jmx agent using jconsole and find out which domain you want to collect data for
+ * 5. Add the jmx adaptor. Ex: To collect metrics from a hadoop datanode that has enabled jmx on 8007, at 60s interval, use command<br>
+ * "add JMXAdaptor DatanodeProcessor localhost 8007 60 Hadoop:*" <br><br>
+ * Your JMX adaptor is now good to go and will send out the collected metrics as chunks to the collector.
+ */
+public class JMXAdaptor extends AbstractAdaptor{
+
+ private static Logger log = Logger.getLogger(JMXAdaptor.class);
+ private MBeanServerConnection mbsc = null;
+ private String port ="", server="localhost";
+ private JMXServiceURL url;
+ private JMXConnector jmxc = null;
+ private long period = 10;
+ private Timer timer;
+ private JMXTimer runner;
+ private String pattern = "";
+ long sendOffset = 0;
+ volatile boolean shutdown = false;
+
+ /**
+ * A thread which creates a new connection to JMX and retries every 10s if the connection is not
+ * successful. It uses the credentials specified in $CHUKWA_CONF_DIR/jmxremote.password.
+ */
+ public class JMXConnect implements Runnable{
+
+ @Override
+ public void run() {
+ String hadoop_conf = System.getenv("CHUKWA_CONF_DIR");
+ StringBuffer sb = new StringBuffer(hadoop_conf);
+ if(!hadoop_conf.endsWith("/")){
+ sb.append(File.separator);
+ }
+ sb.append("jmxremote.password");
+ String jmx_pw_file = sb.toString();
+ shutdown = false;
+ while(!shutdown){
+ try{
+ BufferedReader br = new BufferedReader(new FileReader(jmx_pw_file));
+ String[] creds = br.readLine().split(" ");
+ Map<String, String[]> env = new HashMap<String, String[]>();
+ env.put(JMXConnector.CREDENTIALS, creds);
+ jmxc = JMXConnectorFactory.connect(url, env);
+ mbsc = jmxc.getMBeanServerConnection();
+ if(timer == null) {
+ timer = new Timer();
+ runner = new JMXTimer(dest, JMXAdaptor.this,mbsc);
+ }
+ timer.scheduleAtFixedRate(runner, 0, period * 1000);
+ shutdown = true;
+ } catch (IOException e) {
+ log.error("IOException in JMXConnect thread prevented connect to JMX on port:"+port+", retrying after 10s");
+ log.error(ExceptionUtil.getStackTrace(e));
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e1) {
+ log.error("JMXConnect thread interrupted in sleep, bailing");
+ shutdown = true;
+ }
+ } catch (Exception e) {
+ log.error("Something bad happened in JMXConnect thread, bailing");
+ log.error(ExceptionUtil.getStackTrace(e));
+ timer.cancel();
+ timer = null;
+ shutdown = true;
+ }
+ }
+ }
+
+ }
+
+ /**
+ * A TimerTask which queries the mbean server for all mbeans that match the pattern specified in
+ * the JMXAdaptor arguments, constructs a json object of all data and sends it as a chunk. The
+ * CompositeType, TabularType and Array open mbean types return the numerical values (sizes).
+ * This task is scheduled to run at the interval specified in the adaptor arguments. If the
+ * connection to mbean server is broken, this task cancels the existing timer and tries to
+ * re-connect to the mbean server.
+ */
+
+ public class JMXTimer extends TimerTask{
+
+ private Logger log = Logger.getLogger(JMXTimer.class);
+ private ChunkReceiver receiver = null;
+ private JMXAdaptor adaptor = null;
+ private MBeanServerConnection mbsc = null;
+ //private long sendOffset = 0;
+
+ public JMXTimer(ChunkReceiver receiver, JMXAdaptor adaptor, MBeanServerConnection mbsc){
+ this.receiver = receiver;
+ this.adaptor = adaptor;
+ this.mbsc = mbsc;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ try{
+ ObjectName query = null;
+ if(!pattern.equals("")){
+ query = new ObjectName(pattern);
+ }
+ Set<ObjectName> names = new TreeSet<ObjectName>(mbsc.queryNames(query, null));
+ Object val = null;
+ JSONObject json = new JSONObject();
+
+ for (ObjectName oname: names) {
+ MBeanInfo mbinfo = mbsc.getMBeanInfo(oname);
+ MBeanAttributeInfo [] mbinfos = mbinfo.getAttributes();
+
+ for (MBeanAttributeInfo mb: mbinfos) {
+ try{
+ Descriptor d = mb.getDescriptor();
+ val = mbsc.getAttribute(oname, mb.getName());
+ if(d.getFieldNames().length > 0){ //this is an open mbean
+ OpenType openType = (OpenType)d.getFieldValue("openType");
+
+ if(openType.isArray()){
+ Object[] valarray = (Object[])val;
+ val = Integer.toString(valarray.length);
+ }
+ else if(openType instanceof CompositeType){
+ CompositeData data = (CompositeData)val;
+ val = Integer.toString(data.values().size());
+ }
+ else if(openType instanceof TabularType){
+ TabularData data = (TabularData)val;
+ val = Integer.toString(data.size());
+ }
+ //else it is SimpleType
+ }
+ json.put(mb.getName(),val);
+ }
+ catch(Exception e){
+ log.warn("Exception "+ e.getMessage() +" getting attribute - "+mb.getName() + " Descriptor:"+mb.getDescriptor().getFieldNames().length);
+ }
+ }
+ }
+
+ byte[] data = json.toString().getBytes();
+ sendOffset+=data.length;
+ ChunkImpl c = new ChunkImpl(type, "JMX", sendOffset, data, adaptor);
+ long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+ c.addTag("timeStamp=\""+rightNow+"\"");
+ receiver.add(c);
+ }
+ catch(ConnectException e1){
+ log.error("Got connect exception for the existing MBeanServerConnection");
+ log.error(ExceptionUtil.getStackTrace(e1));
+ log.info("Make sure the target process is running. Retrying connection to JMX on port:"+port);
+ timer.cancel();
+ timer = null;
+ Thread connectThread = new Thread(new JMXConnect());
+ connectThread.start();
+ }
+ catch(Exception e){
+ log.error(ExceptionUtil.getStackTrace(e));
+ }
+
+ }
+
+ }
+
+
+ @Override
+ public String getCurrentStatus() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append(type);
+ buffer.append(" ");
+ buffer.append(server);
+ buffer.append(" ");
+ buffer.append(port);
+ buffer.append(" ");
+ buffer.append(period);
+ buffer.append(" ");
+ buffer.append(pattern);
+ return buffer.toString();
+ }
+
+ @Override
+ public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+ throws AdaptorException {
+ log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
+ try {
+ if(jmxc != null){
+ jmxc.close();
+ }
+ if(timer != null){
+ timer.cancel();
+ }
+ } catch (IOException e) {
+ log.error("JMXAdaptor shutdown failed due to IOException");
+ throw new AdaptorException(ExceptionUtil.getStackTrace(e));
+ } catch (Exception e) {
+ log.error("JMXAdaptor shutdown failed");
+ throw new AdaptorException(ExceptionUtil.getStackTrace(e));
+ }
+ //in case the start thread is still retrying
+ shutdown = true;
+ return sendOffset;
+
+
+ }
+
+ @Override
+ public void start(long offset) throws AdaptorException {
+ try {
+ sendOffset = offset;
+ Thread connectThread = new Thread(new JMXConnect());
+ connectThread.start();
+ } catch(Exception e) {
+ log.error("Failed to schedule JMX connect thread");
+ throw new AdaptorException(ExceptionUtil.getStackTrace(e));
+ }
+
+ }
+
+ @Override
+ public String parseArgs(String s) {
+ //JMXAdaptor MBeanServer port [interval] DomainNamePattern-Ex:"Hadoop:*"
+ String[] tokens = s.split(" ");
+ if(tokens.length == 4){
+ server = tokens[0];
+ port = tokens[1];
+ period = Integer.parseInt(tokens[2]);
+ pattern = tokens[3];
+ }
+ else if(tokens.length == 3){
+ server = tokens[0];
+ port = tokens[1];
+ pattern = tokens[2];
+ }
+ else{
+ log.warn("bad syntax in JMXAdaptor args");
+ return null;
+ }
+ String url_string = "service:jmx:rmi:///jndi/rmi://"+server+ ":"+port+"/jmxrmi";
+ try{
+ url = new JMXServiceURL(url_string);
+ return s;
+ }
+ catch(Exception e){
+ log.error(ExceptionUtil.getStackTrace(e));
+ }
+ return null;
+ }
+
+}
+
Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/JMXAgent.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/JMXAgent.java?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/JMXAgent.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/JMXAgent.java Tue Dec 18 02:59:55 2012
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.JMX;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServer;
+
+public class JMXAgent {
+ private MBeanServer mbs = null;
+ static JMXAgent agent = null;
+ private JMXAgent(){
+ mbs = ManagementFactory.getPlatformMBeanServer();
+ }
+
+ public static MBeanServer getMBeanServerInstance(){
+ if(agent == null){
+ agent = new JMXAgent();
+ }
+ return agent.mbs;
+ }
+}
\ No newline at end of file
Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBean.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBean.java?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBean.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBean.java Tue Dec 18 02:59:55 2012
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.JMX;
+
+import java.util.Map;
+
+public interface MXBean {
+ //Opentype SimpleType
+ //CompositeType
+ //ArrayType
+ //TabularType
+
+ public int getInt();
+ public String getString();
+
+ public String[] getStringArray();
+
+ public Map<Integer,String> getMap();
+
+ public QueueSample getCompositeType();
+
+}
Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBeanImpl.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBeanImpl.java?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBeanImpl.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBeanImpl.java Tue Dec 18 02:59:55 2012
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.JMX;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Queue;
+
+public class MXBeanImpl implements MXBean {
+
+ private Queue<String> q;
+ private int i;
+ private String s;
+ private String[] sarray;
+ private Map<Integer, String> m;
+
+ public MXBeanImpl() {
+ q = null;
+ i = -1;
+ s = null;
+ sarray = null;
+ m = null;
+ }
+
+ public void setQueue(Queue<String> queue){
+ this.q = queue;
+ }
+
+ public void setInt(int i) {
+ this.i = i;
+ }
+
+ public void setString(String s) {
+ this.s = s;
+ }
+
+ public void setStringArray(String[] sarray) {
+ this.sarray = sarray;
+ }
+
+ public void setMap(Map<Integer, String> m) {
+ this.m = m;
+ }
+
+ public int getInt() {
+ return i;
+ }
+
+ public String getString() {
+ return s;
+ }
+
+ public String[] getStringArray() {
+ return sarray;
+ }
+
+ public Map<Integer, String> getMap() {
+ return m;
+ }
+
+ public QueueSample getCompositeType() {
+ synchronized(q) {
+ return new QueueSample(new Date(), q.size(), q.peek());
+ }
+ }
+
+}
Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/QueueSample.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/QueueSample.java?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/QueueSample.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/QueueSample.java Tue Dec 18 02:59:55 2012
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.JMX;
+
+import java.util.Date;
+
+public class QueueSample {
+
+ private final Date date;
+ private final int size;
+ private final String head;
+
+ public QueueSample(Date date, int size, String head) {
+ this.date = date;
+ this.size = size;
+ this.head = head;
+ }
+
+ public Date getDate() {
+ return date;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public String getHead() {
+ return head;
+ }
+}
Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java Tue Dec 18 02:59:55 2012
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.JMX;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.conf.Configuration;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+import junit.framework.AssertionFailedError;
+import junit.framework.TestCase;
+
+public class TestJMXAdaptor extends TestCase{
+ MBeanServer mbs;
+ ChukwaAgent agent;
+ File baseDir, checkpointDir;
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ mbs = JMXAgent.getMBeanServerInstance();
+ baseDir = new File(System.getProperty("test.build.data", "/tmp")).getCanonicalFile();
+ checkpointDir = new File(baseDir, "addAdaptorTestCheckpoints");
+ createEmptyDir(checkpointDir);
+
+ Configuration conf = new Configuration();
+ conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getCanonicalPath());
+ conf.set("chukwaAgent.checkpoint.name", "checkpoint_");
+ conf.setInt("chukwaAgent.control.port", 9093);
+ conf.setInt("chukwaAgent.http.port", 9090);
+ conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
+
+ agent = new ChukwaAgent(conf);
+ }
+
+ public void testJMXAdaptor() {
+ MXBeanImpl mxbean = null;
+ try{
+ mxbean = new MXBeanImpl();
+ ObjectName name = new ObjectName("chukwa:type=test");
+ mbs.registerMBean(mxbean, name);
+ } catch(Exception e){
+ e.printStackTrace();
+ fail("Failed to instantiate and register test mbean");
+ }
+
+ Map<Integer, String> m = new HashMap<Integer, String>() {
+ private static final long serialVersionUID = 1L;
+ {
+ put(1, "a");
+ put(2, "b");
+ put(3, "c");
+ }
+ };
+ Queue<String> queue = new ArrayBlockingQueue<String>(10);
+
+ queue.add("Message1");
+ queue.add("Message2");
+ queue.add("Message3");
+
+ String[] sarray = new String[] {"Screw", "you", "guys", "I'm", "going", "home"};
+
+ mxbean.setQueue(queue);
+ mxbean.setInt(20);
+ mxbean.setMap(m);
+ mxbean.setString("TestString");
+ mxbean.setStringArray(sarray);
+
+ assertEquals(0, agent.adaptorCount());
+ System.out.println("adding jmx adaptor");
+ String id = agent.processAddCommand("add JMXAdaptor DebugProcessor localhost 10100 10 chukwa:* 0");
+ assertEquals(1, agent.adaptorCount());
+
+ //A thread that can block on ChunkQueue and can be interrupted
+ class Collector implements Runnable {
+ String fail = null;
+ public String getFailMessage(){
+ return fail;
+ }
+ public void run(){
+ try {
+ ChunkQueue eventQueue = DataFactory.getInstance().getEventQueue();
+ List<Chunk> evts = new ArrayList<Chunk>();
+ eventQueue.collect(evts, 1);
+
+ // Expected - {"CompositeType":"3","String":"TestString","StringArray":6,"Map":"3","Int":20}
+
+ for (Chunk e : evts) {
+ String data = new String(e.getData());
+ JSONObject obj = (JSONObject) JSONValue.parse(data);
+ assertEquals(obj.get("CompositeType"), "3");
+ assertEquals(obj.get("String"), "TestString");
+ assertEquals(obj.get("StringArray"), "6");
+ assertEquals(obj.get("Map"), "3");
+ assertEquals(obj.get("Int").toString(), "20");
+ System.out.println("Verified all data collected by JMXAdaptor");
+ }
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ fail = "JMXAdaptor failed to collect all data; it was interrupted";
+ } catch (AssertionFailedError e2) {
+ e2.printStackTrace();
+ fail = "Assert failed while verifying JMX data- "+e2.getMessage();
+ } catch (Exception e3) {
+ e3.printStackTrace();
+ fail = "Exception in collector thread. Check the test output for stack trace";
+ }
+ }
+ }
+
+ try {
+ Collector worker = new Collector();
+ Thread t = new Thread(worker);
+ t.start();
+ t.join(20000);
+ if(t.isAlive()){
+ t.interrupt();
+ fail("JMXAdaptor failed to collect data after 20s. Check agent log and surefire report");
+ }
+ String failMessage = worker.getFailMessage();
+ if(failMessage != null){
+ fail(failMessage);
+ }
+
+ } catch(Exception e){
+ e.printStackTrace();
+ fail("Exception in TestJMXAdaptor");
+ }
+
+ System.out.println("shutting down jmx adaptor");
+ agent.stopAdaptor(id, true);
+ assertEquals(0, agent.adaptorCount());
+ }
+
+ //returns true if dir exists
+ public static boolean nukeDirContents(File dir) {
+ if(dir.exists()) {
+ if(dir.isDirectory()) {
+ for(File f: dir.listFiles()) {
+ nukeDirContents(f);
+ f.delete();
+ }
+ } else
+ dir.delete();
+
+ return true;
+ }
+ return false;
+ }
+
+ public static void createEmptyDir(File dir) {
+ if(!nukeDirContents(dir))
+ dir.mkdir();
+ assertTrue(dir.isDirectory() && dir.listFiles().length == 0);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ nukeDirContents(checkpointDir);
+ checkpointDir.delete();
+ super.tearDown();
+ }
+}
Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java Tue Dec 18 02:59:55 2012
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.conf.Configuration;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+public class TestAddAdaptor extends TestCase {
+ ChukwaAgent agent = null;
+ File baseDir;
+
+ public void testJmxAdd() throws IOException,
+ ChukwaAgent.AlreadyRunningException, InterruptedException {
+ Configuration conf = new Configuration();
+ baseDir = new File(System.getProperty("test.build.data", "/tmp")).getCanonicalFile();
+ File checkpointDir = new File(baseDir, "addAdaptorTestCheckpoints");
+ createEmptyDir(checkpointDir);
+
+ conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getCanonicalPath());
+ conf.set("chukwaAgent.checkpoint.name", "checkpoint_");
+ conf.setInt("chukwaAgent.control.port", 0);
+ conf.setInt("chukwaAgent.http.port", 9090);
+ conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
+
+ agent = new ChukwaAgent(conf);
+
+ assertEquals(0, agent.adaptorCount());
+ System.out.println("adding jmx adaptor");
+ String id = agent.processAddCommand("add JMXAdaptor DebugProcessor localhost 0 60 hadoop:* 0");
+ assertEquals(1, agent.adaptorCount());
+
+ System.out.println("shutting down jmx adaptor");
+ agent.stopAdaptor(id, true);
+ assertEquals(0, agent.adaptorCount());
+
+ String rest_url = "http://localhost:9090/rest/v1/adaptor";
+ System.out.println("adding jmx adaptor using rest url - " + rest_url);
+
+ String dataType = "DebugProcessor", adaptorClass = "JMXAdaptor", adaptorParams = "localhost 0 60 hadoop:*", offset = "0";
+ String adaptor_json = "{\"DataType\":\""+dataType+"\", \"AdaptorClass\":\""+adaptorClass+"\", \"AdaptorParams\" : \""+adaptorParams+"\", \"Offset\" : \""+offset+"\" }";
+ System.out.println(adaptor_json);
+ Client client = Client.create();
+ WebResource resource = client.resource(rest_url);
+ ClientResponse response = resource.type("application/json")
+ .post(ClientResponse.class, adaptor_json);
+ if (response.getStatus() != 200 && response.getStatus() != 201) {
+ fail("Add adaptor through REST failed : HTTP error code : " + response.getStatus());
+ }
+ assertEquals(1, agent.adaptorCount());
+ String result = response.getEntity(String.class);
+
+ try{
+ InputStream inputStream = new ByteArrayInputStream(result.getBytes());
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder builder = factory.newDocumentBuilder();
+ Document doc = builder.parse(inputStream);
+ NodeList list = doc.getElementsByTagName("Adaptor");
+ Node node = list.item(0);
+ id = node.getAttributes().getNamedItem("id").getNodeValue();
+ } catch(Exception e){
+ fail("Failed to parse response from add. Complete response is:\n"+result);
+ }
+
+
+ System.out.println("shutting down jmx adaptor with id:"+id+" through rest");
+ resource = client.resource(rest_url+"/"+id);
+ response = resource.delete(ClientResponse.class);
+ if (response.getStatus() != 200 && response.getStatus() != 201) {
+ fail("Delete adaptor through REST failed : HTTP error code : " + response.getStatus());
+ }
+
+ assertEquals(0, agent.adaptorCount());
+
+ agent.shutdown();
+ Thread.sleep(1500);
+ nukeDirContents(checkpointDir);
+ checkpointDir.delete();
+ }
+
+ protected void tearDown(){
+ if(agent != null){
+ agent.shutdown();
+ }
+ }
+
+ //returns true if dir exists
+ public static boolean nukeDirContents(File dir) {
+ if(dir.exists()) {
+ if(dir.isDirectory()) {
+ for(File f: dir.listFiles()) {
+ nukeDirContents(f);
+ f.delete();
+ }
+ } else
+ dir.delete();
+
+ return true;
+ }
+ return false;
+ }
+
+ public static void createEmptyDir(File dir) {
+ if(!nukeDirContents(dir))
+ dir.mkdir();
+ assertTrue(dir.isDirectory() && dir.listFiles().length == 0);
+ }
+}