You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2012/12/18 04:00:13 UTC

svn commit: r1423264 - in /incubator/chukwa/trunk: ./ conf/ src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/ src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/ src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/

Author: asrabkin
Date: Tue Dec 18 02:59:55 2012
New Revision: 1423264

URL: http://svn.apache.org/viewvc?rev=1423264&view=rev
Log:
CHUKWA-669. JMX Adaptor.  Contributed by shreyas subramanya.

Added:
    incubator/chukwa/trunk/conf/jmxremote.access
    incubator/chukwa/trunk/conf/jmxremote.password
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
    incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/
    incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/JMXAgent.java
    incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBean.java
    incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBeanImpl.java
    incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/QueueSample.java
    incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
    incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
Modified:
    incubator/chukwa/trunk/CHANGES.txt
    incubator/chukwa/trunk/pom.xml

Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1423264&r1=1423263&r2=1423264&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Tue Dec 18 02:59:55 2012
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    CHUKWA-669. JMX Adaptor.  (shreyas subramanya via asrabkin)
+
     CHUKWA-671. Json processors for processing JMX data from Hadoop, HBase and Zookeeper.  (shreyas subramanya via asrabkin)
 
     CHUKWA-635. Collect swap usage. (Eric Yang)

Added: incubator/chukwa/trunk/conf/jmxremote.access
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/jmxremote.access?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/conf/jmxremote.access (added)
+++ incubator/chukwa/trunk/conf/jmxremote.access Tue Dec 18 02:59:55 2012
@@ -0,0 +1,2 @@
+monitorRole readonly
+controlRole readonly

Added: incubator/chukwa/trunk/conf/jmxremote.password
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/jmxremote.password?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/conf/jmxremote.password (added)
+++ incubator/chukwa/trunk/conf/jmxremote.password Tue Dec 18 02:59:55 2012
@@ -0,0 +1,2 @@
+monitorRole a72cc3b450a0addcbe8307ed98
+controlRole a72cc3b450a0addcbe8307ed98

Modified: incubator/chukwa/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/pom.xml?rev=1423264&r1=1423263&r2=1423264&view=diff
==============================================================================
--- incubator/chukwa/trunk/pom.xml (original)
+++ incubator/chukwa/trunk/pom.xml Tue Dec 18 02:59:55 2012
@@ -390,6 +390,29 @@
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <version>1.6</version>
+                <configuration>
+                    <encoding>UTF-8</encoding>
+                </configuration>
+                <executions>
+                        <execution>
+                        <id>chmod-jmx-file</id>
+                        <phase>process-resources</phase>
+                        <configuration>
+                            <tasks name="setup">
+                                <chmod file="target/conf/jmxremote.password" perm="600" />
+                                <chmod file="target/conf/jmxremote.access" perm="600" />
+                            </tasks>
+                        </configuration>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-jar-plugin</artifactId>
                 <version>2.3.1</version>
                 <executions>
@@ -456,7 +479,7 @@
                         </goals>
                         <configuration>
                             <skip>false</skip>
-                            <argLine>-Xmx1024m</argLine>
+                            <argLine>-Xmx1024m -Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=true -Dcom.sun.management.jmxremote.password.file=${basedir}/target/conf/jmxremote.password -Dcom.sun.management.jmxremote.access.file=${basedir}/target/conf/jmxremote.access -Dcom.sun.management.jmxremote.port=10100</argLine>
                             <reportsDirectory>${project.build.directory}/test-reports</reportsDirectory>
                             <forkMode>pertest</forkMode>
                             <redirectTestOutputToFile>true</redirectTestOutputToFile>
@@ -616,9 +639,21 @@
                                         <expandproperties/>
                                     </filterchain>
                                 </copy>
+                                <copy file="${basedir}/conf/jmxremote.password" tofile="${test.build.dir}/conf/jmxremote.password">
+                                    <filterchain>
+                                        <expandproperties/>
+                                    </filterchain>
+                                </copy>
+                                <copy file="${basedir}/conf/jmxremote.password" tofile="${test.build.dir}/conf/jmxremote.access">
+                                    <filterchain>
+                                        <expandproperties/>
+                                    </filterchain>
+                                </copy>
                                 <copy file="${basedir}/src/test/resources/hbase-site.xml" tofile="${test.build.dir}/classes/hbase-site.xml"></copy>
                                 <copy file="${basedir}/conf/log4j.properties" tofile="${test.build.dir}/conf/log4j.properties"></copy>
                                 <copy file="${basedir}/conf/auth.conf" tofile="${test.build.dir}/conf/auth.conf"></copy>
+                                <chmod file="${test.build.dir}/conf/jmxremote.password" perm="600" />
+                                <chmod file="${test.build.dir}/conf/jmxremote.access" perm="600" />
                             </tasks>
                         </configuration>
                         <goals>

Added: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java (added)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java Tue Dec 18 02:59:55 2012
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.rmi.ConnectException;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeSet;
+
+import javax.management.Descriptor;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularType;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.log4j.Logger;
+import org.json.simple.JSONObject;
+
+/**
+ * Query metrics through JMX interface. <br>
+ * 1. Enable remote jmx monitoring for the target
+ * jvm by specifying -Dcom.sun.management.jmxremote.port=jmx_port<br>
+ * 2. Enable authentication with -Dcom.sun.management.jmxremote.authenticate=true <br>
+ *    -Dcom.sun.management.jmxremote.password.file=${CHUKWA_CONF_DIR}/jmxremote.password <br>
+ *    -Dcom.sun.management.jmxremote.access.file=${CHUKWA_CONF_DIR}/jmxremote.access <br>
+ * 3. Optionally specify these jvm options <br>
+ * 	  -Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote.ssl=false <br>
+ * 4. Connect to the jmx agent using jconsole and find out which domain you want to collect data for
+ * 5. Add the jmx adaptor. Ex: To collect metrics from a hadoop datanode that has enabled jmx on 8007, at 60s interval, use command<br>
+ *   "add JMXAdaptor DatanodeProcessor localhost 8007 60 Hadoop:*" <br><br>
+ * Your JMX adaptor is now good to go and will send out the collected metrics as chunks to the collector.
+ */
+public class JMXAdaptor extends AbstractAdaptor{
+
+	private static Logger log = Logger.getLogger(JMXAdaptor.class);
+	private MBeanServerConnection mbsc = null;
+	private String port ="", server="localhost";	
+	private JMXServiceURL url;
+	private JMXConnector jmxc = null;
+	private long period = 10;
+	private Timer timer;
+	private JMXTimer runner;
+	private String pattern = "";
+	long sendOffset = 0;
+	volatile boolean shutdown = false;
+	
+	/**
+	 * A thread which creates a new connection to JMX and retries every 10s if the connection is not  
+	 * successful. It uses the credentials specified in $CHUKWA_CONF_DIR/jmxremote.password.
+	 */
+	public class JMXConnect implements Runnable{
+
+		@Override
+		public void run() {
+			String hadoop_conf = System.getenv("CHUKWA_CONF_DIR");
+			StringBuffer sb = new StringBuffer(hadoop_conf);
+			if(!hadoop_conf.endsWith("/")){
+				sb.append(File.separator);
+			}
+			sb.append("jmxremote.password");
+			String jmx_pw_file = sb.toString();
+			shutdown = false;
+			while(!shutdown){
+				try{					
+					BufferedReader br = new BufferedReader(new FileReader(jmx_pw_file));
+					String[] creds = br.readLine().split(" ");
+					Map<String, String[]> env = new HashMap<String, String[]>();			
+					env.put(JMXConnector.CREDENTIALS, creds);
+					jmxc = JMXConnectorFactory.connect(url, env);
+					mbsc = jmxc.getMBeanServerConnection();							
+					if(timer == null) {
+						timer = new Timer();
+						runner = new JMXTimer(dest, JMXAdaptor.this,mbsc);
+					}
+					timer.scheduleAtFixedRate(runner, 0, period * 1000);
+					shutdown = true;
+				} catch (IOException e) {
+					log.error("IOException in JMXConnect thread prevented connect to JMX on port:"+port+", retrying after 10s");
+					log.error(ExceptionUtil.getStackTrace(e));	
+					try {
+						Thread.sleep(10000);
+					} catch (InterruptedException e1) {
+						log.error("JMXConnect thread interrupted in sleep, bailing");
+						shutdown = true;
+					}
+				} catch (Exception e) {
+					log.error("Something bad happened in JMXConnect thread, bailing");
+					log.error(ExceptionUtil.getStackTrace(e));
+					timer.cancel();
+					timer = null;
+					shutdown = true;
+				}						
+			}
+		}
+		
+	}
+	
+	/**
+	 * A TimerTask which queries the mbean server for all mbeans that match the pattern specified in
+	 * the JMXAdaptor arguments, constructs a json object of all data and sends it as a chunk. The 
+	 * CompositeType, TabularType and Array open mbean types return the numerical values (sizes). 
+	 * This task is scheduled to run at the interval specified in the adaptor arguments. If the 
+	 * connection to mbean server is broken, this task cancels the existing timer and tries to 
+	 * re-connect to the mbean server.  
+	 */
+	
+	public class JMXTimer extends TimerTask{
+
+		private Logger log = Logger.getLogger(JMXTimer.class);
+		private ChunkReceiver receiver = null;
+		private JMXAdaptor adaptor = null;
+		private MBeanServerConnection mbsc = null;
+		//private long sendOffset = 0;
+		
+		public JMXTimer(ChunkReceiver receiver, JMXAdaptor adaptor, MBeanServerConnection mbsc){
+			this.receiver = receiver;
+			this.adaptor = adaptor;		
+			this.mbsc = mbsc;
+		}
+		
+		@SuppressWarnings("unchecked")
+		@Override
+		public void run() {
+			try{
+				ObjectName query = null;
+				if(!pattern.equals("")){
+					query = new ObjectName(pattern);			
+				}
+				Set<ObjectName> names = new TreeSet<ObjectName>(mbsc.queryNames(query, null));
+				Object val = null;
+				JSONObject json = new JSONObject();
+									
+				for (ObjectName oname: names) {			
+					MBeanInfo mbinfo = mbsc.getMBeanInfo(oname);
+					MBeanAttributeInfo [] mbinfos = mbinfo.getAttributes();						
+					
+					for (MBeanAttributeInfo mb: mbinfos) {
+						try{
+							Descriptor d = mb.getDescriptor();
+							val = mbsc.getAttribute(oname, mb.getName());
+							if(d.getFieldNames().length > 0){ //this is an open mbean
+								OpenType openType = (OpenType)d.getFieldValue("openType");	
+								
+								if(openType.isArray()){									
+									Object[] valarray = (Object[])val;									
+									val = Integer.toString(valarray.length);
+								}
+								else if(openType instanceof CompositeType){
+									CompositeData data = (CompositeData)val;
+									val = Integer.toString(data.values().size());									
+								}
+								else if(openType instanceof TabularType){
+									TabularData data = (TabularData)val;
+									val = Integer.toString(data.size());
+								}
+								//else it is SimpleType									
+							}
+							json.put(mb.getName(),val);
+						}
+						catch(Exception e){
+							log.warn("Exception "+ e.getMessage() +" getting attribute - "+mb.getName() + " Descriptor:"+mb.getDescriptor().getFieldNames().length);
+						}						
+					}
+				}
+				
+				byte[] data = json.toString().getBytes();		
+				sendOffset+=data.length;				
+				ChunkImpl c = new ChunkImpl(type, "JMX", sendOffset, data, adaptor);
+				long rightNow = Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+				c.addTag("timeStamp=\""+rightNow+"\"");
+				receiver.add(c);
+			}
+			catch(ConnectException e1){
+				log.error("Got connect exception for the existing MBeanServerConnection");
+				log.error(ExceptionUtil.getStackTrace(e1));
+				log.info("Make sure the target process is running. Retrying connection to JMX on port:"+port);
+				timer.cancel();
+				timer = null;
+				Thread connectThread = new Thread(new JMXConnect());
+				connectThread.start();
+			}
+			catch(Exception e){
+				log.error(ExceptionUtil.getStackTrace(e));
+			}
+			
+		}
+		
+	}
+	
+	
+	@Override
+	public String getCurrentStatus() {
+		StringBuilder buffer = new StringBuilder();
+		buffer.append(type);
+		buffer.append(" ");
+		buffer.append(server);
+		buffer.append(" ");
+		buffer.append(port);
+		buffer.append(" ");
+		buffer.append(period);
+		buffer.append(" ");
+		buffer.append(pattern);
+		return buffer.toString();
+	}
+	
+	@Override
+	public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+			throws AdaptorException {
+		log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
+		try {
+			if(jmxc != null){
+				jmxc.close();
+			}
+			if(timer != null){
+				timer.cancel();
+			}
+		} catch (IOException e) {
+			log.error("JMXAdaptor shutdown failed due to IOException");
+			throw new AdaptorException(ExceptionUtil.getStackTrace(e));
+		} catch (Exception e) {
+			log.error("JMXAdaptor shutdown failed");
+			throw new AdaptorException(ExceptionUtil.getStackTrace(e));
+		}
+		//in case the start thread is still retrying
+		shutdown = true;
+	    return sendOffset;
+		
+		
+	}
+
+	@Override
+	public void start(long offset) throws AdaptorException {
+		try {			
+			sendOffset = offset;
+			Thread connectThread = new Thread(new JMXConnect());
+			connectThread.start();			
+		} catch(Exception e) {
+			log.error("Failed to schedule JMX connect thread");
+			throw new AdaptorException(ExceptionUtil.getStackTrace(e));	
+		}
+		
+	}
+
+	@Override
+	public String parseArgs(String s) {
+		//JMXAdaptor MBeanServer port [interval] DomainNamePattern-Ex:"Hadoop:*"
+		String[] tokens = s.split(" ");
+		if(tokens.length == 4){
+			server = tokens[0];
+			port = tokens[1];
+			period = Integer.parseInt(tokens[2]);
+			pattern = tokens[3];
+		}
+		else if(tokens.length == 3){
+			server = tokens[0];
+			port = tokens[1];
+			pattern = tokens[2];
+		}
+		else{
+			log.warn("bad syntax in JMXAdaptor args");
+			return null;
+		}
+		String url_string = "service:jmx:rmi:///jndi/rmi://"+server+ ":"+port+"/jmxrmi";
+		try{
+			url = new JMXServiceURL(url_string);			
+			return s;
+		}
+		catch(Exception e){
+			log.error(ExceptionUtil.getStackTrace(e));
+		}
+		return null;		
+	}
+	
+}
+

Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/JMXAgent.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/JMXAgent.java?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/JMXAgent.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/JMXAgent.java Tue Dec 18 02:59:55 2012
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.JMX;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServer;
+
+public class JMXAgent {
+	private MBeanServer mbs = null;
+	static JMXAgent agent = null;
+	private JMXAgent(){
+		mbs = ManagementFactory.getPlatformMBeanServer();
+	}
+	
+	public static MBeanServer getMBeanServerInstance(){
+		if(agent == null){
+			agent = new JMXAgent();
+		}
+		return agent.mbs;
+	}
+}
\ No newline at end of file

Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBean.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBean.java?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBean.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBean.java Tue Dec 18 02:59:55 2012
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.JMX;
+
+import java.util.Map;
+
+public interface MXBean {
+	//Opentype SimpleType
+	//CompositeType
+	//ArrayType
+	//TabularType
+	
+	public int getInt();
+	public String getString();
+	
+	public String[] getStringArray();
+	
+	public Map<Integer,String> getMap();
+	
+	public QueueSample getCompositeType();
+	
+}

Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBeanImpl.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBeanImpl.java?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBeanImpl.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/MXBeanImpl.java Tue Dec 18 02:59:55 2012
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.JMX;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.Queue;
+
+public class MXBeanImpl implements MXBean {
+
+	private Queue<String> q;
+	private int i;
+	private String s;
+	private String[] sarray;
+	private Map<Integer, String> m;
+	
+	public MXBeanImpl() {
+		q = null;
+		i = -1;
+		s = null;
+		sarray = null;
+		m = null;
+	}
+	
+	public void setQueue(Queue<String> queue){
+		this.q = queue;
+	}
+	
+	public void setInt(int i) {
+		this.i = i;
+	}
+	
+	public void setString(String s) {
+		this.s = s;
+	}
+	
+	public void setStringArray(String[] sarray) {
+		this.sarray = sarray;
+	}
+	
+	public void setMap(Map<Integer, String> m) {
+		this.m = m;
+	}
+	
+	public int getInt() {
+		return i;
+	}
+
+	public String getString() {
+		return s;
+	}
+
+	public String[] getStringArray() {
+		return sarray;
+	}
+
+	public Map<Integer, String> getMap() {
+		return m;
+	}
+
+	public QueueSample getCompositeType() {
+		synchronized(q) {
+			return new QueueSample(new Date(), q.size(), q.peek());
+		}
+	}
+	
+}

Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/QueueSample.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/QueueSample.java?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/QueueSample.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/QueueSample.java Tue Dec 18 02:59:55 2012
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.JMX;
+
+import java.util.Date;
+
+public class QueueSample {
+    
+    private final Date date;
+    private final int size;
+    private final String head;
+   
+    public QueueSample(Date date, int size, String head) {
+        this.date = date;
+        this.size = size;
+        this.head = head;
+    }
+
+    public Date getDate() {
+        return date;
+    }
+
+    public int getSize() {
+        return size;
+    }
+
+    public String getHead() {
+        return head;
+    }
+}

Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMX/TestJMXAdaptor.java Tue Dec 18 02:59:55 2012
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.adaptor.JMX;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.conf.Configuration;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+import junit.framework.AssertionFailedError;
+import junit.framework.TestCase;
+
+public class TestJMXAdaptor extends TestCase{
+	MBeanServer mbs;
+	ChukwaAgent agent;
+	File baseDir, checkpointDir;
+	@Override
+	protected void setUp() throws Exception {
+		super.setUp();
+		mbs = JMXAgent.getMBeanServerInstance();
+		baseDir = new File(System.getProperty("test.build.data", "/tmp")).getCanonicalFile();
+	    checkpointDir = new File(baseDir, "addAdaptorTestCheckpoints");		
+	    createEmptyDir(checkpointDir);
+	    
+	    Configuration conf = new Configuration();	    
+	    conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getCanonicalPath());
+	    conf.set("chukwaAgent.checkpoint.name", "checkpoint_");
+	    conf.setInt("chukwaAgent.control.port", 9093);
+	    conf.setInt("chukwaAgent.http.port", 9090);
+	    conf.setBoolean("chukwaAgent.checkpoint.enabled", false);	    
+	    
+	    agent = new ChukwaAgent(conf);
+	}
+	
+	public void testJMXAdaptor() {
+		MXBeanImpl mxbean = null;
+		try{
+			mxbean = new MXBeanImpl();
+			ObjectName name = new ObjectName("chukwa:type=test");
+			mbs.registerMBean(mxbean, name);
+		} catch(Exception e){
+			e.printStackTrace();
+			fail("Failed to instantiate and register test mbean");
+		}
+		
+		Map<Integer, String> m = new HashMap<Integer, String>() {
+			private static final long serialVersionUID = 1L;
+			{
+				put(1, "a");
+				put(2, "b");
+				put(3, "c");
+			}
+		};
+		Queue<String> queue = new ArrayBlockingQueue<String>(10);
+		
+		queue.add("Message1");
+		queue.add("Message2");
+		queue.add("Message3");		
+		
+		String[] sarray = new String[] {"Screw", "you", "guys", "I'm", "going", "home"};
+		
+		mxbean.setQueue(queue);
+		mxbean.setInt(20);
+		mxbean.setMap(m);
+		mxbean.setString("TestString");
+		mxbean.setStringArray(sarray);
+				
+	    assertEquals(0, agent.adaptorCount());
+	    System.out.println("adding jmx adaptor");
+	    String id = agent.processAddCommand("add JMXAdaptor DebugProcessor localhost 10100 10 chukwa:* 0");
+	    assertEquals(1, agent.adaptorCount());
+	    
+	    //A thread that can block on ChunkQueue and can be interrupted
+	    class Collector implements Runnable {
+	    	String fail = null;
+    		public String getFailMessage(){
+    			return fail;
+    		}
+    		public void run(){
+    			try {
+		        	ChunkQueue eventQueue = DataFactory.getInstance().getEventQueue();    	    
+		    	    List<Chunk> evts = new ArrayList<Chunk>();
+					eventQueue.collect(evts, 1);
+					
+					// Expected - {"CompositeType":"3","String":"TestString","StringArray":6,"Map":"3","Int":20}
+					
+					for (Chunk e : evts) {
+			        	String data = new String(e.getData());
+			        	JSONObject obj = (JSONObject) JSONValue.parse(data);				        	
+			        	assertEquals(obj.get("CompositeType"), "3");
+			        	assertEquals(obj.get("String"), "TestString");	        	
+			        	assertEquals(obj.get("StringArray"), "6");
+			        	assertEquals(obj.get("Map"), "3");
+			        	assertEquals(obj.get("Int").toString(), "20");
+			        	System.out.println("Verified all data collected by JMXAdaptor");
+			        }
+				} catch (InterruptedException e1) {			
+					e1.printStackTrace();
+					fail = "JMXAdaptor failed to collect all data; it was interrupted";
+				} catch (AssertionFailedError e2) {
+					e2.printStackTrace();
+					fail = "Assert failed while verifying JMX data- "+e2.getMessage();
+				} catch (Exception e3) {
+					e3.printStackTrace();
+					fail = "Exception in collector thread. Check the test output for stack trace";
+				}        			
+    		}
+	    }
+	    
+        try {
+        	Collector worker = new Collector();
+        	Thread t = new Thread(worker);      	
+        	t.start();
+        	t.join(20000);
+        	if(t.isAlive()){
+        		t.interrupt();
+        		fail("JMXAdaptor failed to collect data after 20s. Check agent log and surefire report");
+        	}
+        	String failMessage = worker.getFailMessage();
+        	if(failMessage != null){
+        		fail(failMessage);
+        	}
+        	
+        } catch(Exception e){
+        	e.printStackTrace();
+        	fail("Exception in TestJMXAdaptor");
+        }
+
+	    System.out.println("shutting down jmx adaptor");
+	    agent.stopAdaptor(id, true);
+	    assertEquals(0, agent.adaptorCount());
+	}
+	
+	 //returns true if dir exists
+	  public static boolean nukeDirContents(File dir) {
+	    if(dir.exists()) {
+	      if(dir.isDirectory()) {
+	        for(File f: dir.listFiles()) {
+	          nukeDirContents(f);
+	          f.delete();
+	        }
+	      } else
+	        dir.delete();
+	      
+	      return true;
+	    }
+	    return false;
+	  }
+	  
+	  public static void createEmptyDir(File dir) {
+	    if(!nukeDirContents(dir))
+	      dir.mkdir();
+	    assertTrue(dir.isDirectory() && dir.listFiles().length == 0);
+	  }
+	
+	@Override
+	protected void tearDown() throws Exception {
+		nukeDirContents(checkpointDir);
+	    checkpointDir.delete();	   
+		super.tearDown();
+	}
+}

Added: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java?rev=1423264&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java (added)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestAddAdaptor.java Tue Dec 18 02:59:55 2012
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.conf.Configuration;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+public class TestAddAdaptor extends TestCase {
+	ChukwaAgent agent = null;
+	File baseDir;
+	  
+	public void testJmxAdd() throws IOException,
+	  ChukwaAgent.AlreadyRunningException, InterruptedException {
+		Configuration conf = new Configuration();
+	    baseDir = new File(System.getProperty("test.build.data", "/tmp")).getCanonicalFile();
+	    File checkpointDir = new File(baseDir, "addAdaptorTestCheckpoints");
+	    createEmptyDir(checkpointDir);
+	    
+	    conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getCanonicalPath());
+	    conf.set("chukwaAgent.checkpoint.name", "checkpoint_");
+	    conf.setInt("chukwaAgent.control.port", 0);
+	    conf.setInt("chukwaAgent.http.port", 9090);
+	    conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
+	    
+	    agent = new ChukwaAgent(conf);
+	    	    
+	    assertEquals(0, agent.adaptorCount());
+	    System.out.println("adding jmx adaptor");
+	    String id = agent.processAddCommand("add JMXAdaptor DebugProcessor localhost 0 60 hadoop:* 0");
+	    assertEquals(1, agent.adaptorCount());
+	    
+	    System.out.println("shutting down jmx adaptor");
+	    agent.stopAdaptor(id, true);
+	    assertEquals(0, agent.adaptorCount());
+	    
+	    String rest_url = "http://localhost:9090/rest/v1/adaptor";
+	    System.out.println("adding jmx adaptor using rest url - " + rest_url);
+	    
+	    String dataType = "DebugProcessor", adaptorClass = "JMXAdaptor", adaptorParams = "localhost 0 60 hadoop:*", offset = "0";
+	    String adaptor_json = "{\"DataType\":\""+dataType+"\", \"AdaptorClass\":\""+adaptorClass+"\", \"AdaptorParams\" : \""+adaptorParams+"\", \"Offset\" : \""+offset+"\" }";
+	    System.out.println(adaptor_json);
+	    Client client = Client.create();
+	    WebResource resource = client.resource(rest_url);
+	    ClientResponse response = resource.type("application/json")
+	 		   .post(ClientResponse.class, adaptor_json);
+	    if (response.getStatus() != 200 && response.getStatus() != 201) {
+			fail("Add adaptor through REST failed : HTTP error code : " + response.getStatus());
+		}
+	    assertEquals(1, agent.adaptorCount());
+	    String result = response.getEntity(String.class);	    
+	    
+	    try{
+	    	InputStream inputStream = new ByteArrayInputStream(result.getBytes());
+		    DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+			DocumentBuilder builder = factory.newDocumentBuilder();
+			Document doc = builder.parse(inputStream); 
+			NodeList list = doc.getElementsByTagName("Adaptor");
+			Node node = list.item(0);
+			id = node.getAttributes().getNamedItem("id").getNodeValue();
+	    } catch(Exception e){
+	    	fail("Failed to parse response from add. Complete response is:\n"+result);
+	    }
+	    
+    	
+    	System.out.println("shutting down jmx adaptor with id:"+id+" through rest");
+	    resource = client.resource(rest_url+"/"+id);
+	    response = resource.delete(ClientResponse.class);
+	    if (response.getStatus() != 200 && response.getStatus() != 201) {
+			fail("Delete adaptor through REST failed : HTTP error code : " + response.getStatus());
+		}
+	    
+	    assertEquals(0, agent.adaptorCount());
+	    
+	    agent.shutdown();	    
+	    Thread.sleep(1500);	    
+	    nukeDirContents(checkpointDir);
+	    checkpointDir.delete();	    
+	}
+	
+	protected void tearDown(){
+		if(agent != null){
+			agent.shutdown();
+		}
+	}
+	
+	 //returns true if dir exists
+	  public static boolean nukeDirContents(File dir) {
+	    if(dir.exists()) {
+	      if(dir.isDirectory()) {
+	        for(File f: dir.listFiles()) {
+	          nukeDirContents(f);
+	          f.delete();
+	        }
+	      } else
+	        dir.delete();
+	      
+	      return true;
+	    }
+	    return false;
+	  }
+	  
+	  public static void createEmptyDir(File dir) {
+	    if(!nukeDirContents(dir))
+	      dir.mkdir();
+	    assertTrue(dir.isDirectory() && dir.listFiles().length == 0);
+	  }
+}