You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2014/06/30 04:13:10 UTC

svn commit: r1606617 - in /chukwa/trunk: ./ bin/ conf/ src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/ src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/ src/main/java/org/apache/hadoop/chukwa/datacollection/agent...

Author: eyang
Date: Mon Jun 30 02:13:10 2014
New Revision: 1606617

URL: http://svn.apache.org/r1606617
Log:
CHUKWA-674. Integrated Chukwa collector feature to Chukwa Agent.  (Eric Yang)

Added:
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
Modified:
    chukwa/trunk/CHANGES.txt
    chukwa/trunk/bin/chukwa
    chukwa/trunk/conf/chukwa-agent-conf.xml
    chukwa/trunk/conf/hbase.schema
    chukwa/trunk/pom.xml
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Heatmap.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SystemMetrics.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSyslogAdaptor.java
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestHBaseWriter.java
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/dataloader/TestSocketDataLoader.java
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/rest/resource/TestClientTrace.java
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java

Modified: chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/chukwa/trunk/CHANGES.txt?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/CHANGES.txt (original)
+++ chukwa/trunk/CHANGES.txt Mon Jun 30 02:13:10 2014
@@ -12,6 +12,8 @@ Release 0.6 - Unreleased
 
   NEW FEATURES
 
+    CHUKWA-674. Integrated Chukwa collector feature to Chukwa Agent.  (Eric Yang)
+
     CHUKWA-705. Updated Chukwa to support JDK7 and updated to Hadoop 1.2.1 and HBase 0.96.1.1.  (Eric Yang)
 
     CHUKWA-699. Updated timeline widget stylesheet.  (Eric Yang)

Modified: chukwa/trunk/bin/chukwa
URL: http://svn.apache.org/viewvc/chukwa/trunk/bin/chukwa?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/bin/chukwa (original)
+++ chukwa/trunk/bin/chukwa Mon Jun 30 02:13:10 2014
@@ -161,6 +161,11 @@ elif [ "$COMMAND" = "tail" ]; then
 fi
 
 pid="${CHUKWA_PID_DIR}/$PID.pid"
+
+if [ "$1" = "start" ]; then
+  shift
+fi
+
 if [ "$1" = "stop" ]; then
   if [ -e $pid ]; then
     TARGET_PID=`cat $pid`
@@ -169,23 +174,29 @@ if [ "$1" = "stop" ]; then
       sleep $CHUKWA_STOP_TIMEOUT
       if kill -0 $TARGET_PID > /dev/null 2>&1; then
         kill -9 $TARGET_PID
+        exit 1
       fi
     fi
   else 
     echo "Cannot find PID file - $PID.pid; NO $PID to stop";
   fi
+  exit 0
 elif [ -f $pid ]; then
  if kill -0 `cat $pid` > /dev/null 2>&1; then
     echo $command running as process `cat $pid`.  Stop it first.
     exit 1
+ else
+    # pid file exists, but process is dead.
+    echo $command is not runnning, but pid file existed.
+    rm -f $pid
  fi
-else 
-  # run command
-  if [ "$BACKGROUND" = "false" ]; then
-    ${JAVA_HOME}/bin/java ${JAVA_OPT} -Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DCHUKWA_DATA_DIR=${CHUKWA_DATA_DIR} -DAPP=${APP} -Dlog4j.configuration=chukwa-log4j.properties -classpath ${CHUKWA_CONF_DIR}:${CLASSPATH}:${CHUKWA_CLASSPATH}:${tools} ${CLASS} $OPTS $@
-  else
-    exec ${JAVA_HOME}/bin/java ${JAVA_OPT} -Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DCHUKWA_DATA_DIR=${CHUKWA_DATA_DIR} -DAPP=${APP} -Dlog4j.configuration=chukwa-log4j.properties -classpath ${CHUKWA_CONF_DIR}:${CLASSPATH}:${CHUKWA_CLASSPATH}:${tools} ${CLASS} $OPTS $@ &
-    sleep 1
-  fi
+fi
+
+# run command
+if [ "$BACKGROUND" = "false" ]; then
+  ${JAVA_HOME}/bin/java ${JAVA_OPT} -Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DCHUKWA_DATA_DIR=${CHUKWA_DATA_DIR} -DAPP=${APP} -Dlog4j.configuration=chukwa-log4j.properties -classpath ${CHUKWA_CONF_DIR}:${CLASSPATH}:${CHUKWA_CLASSPATH}:${tools} ${CLASS} $OPTS $@
+else
+  exec ${JAVA_HOME}/bin/java ${JAVA_OPT} -Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DCHUKWA_DATA_DIR=${CHUKWA_DATA_DIR} -DAPP=${APP} -Dlog4j.configuration=chukwa-log4j.properties -classpath ${CHUKWA_CONF_DIR}:${CLASSPATH}:${CHUKWA_CLASSPATH}:${tools} ${CLASS} $OPTS $@ &
+  sleep 1
 fi
 

Modified: chukwa/trunk/conf/chukwa-agent-conf.xml
URL: http://svn.apache.org/viewvc/chukwa/trunk/conf/chukwa-agent-conf.xml?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/conf/chukwa-agent-conf.xml (original)
+++ chukwa/trunk/conf/chukwa-agent-conf.xml Mon Jun 30 02:13:10 2014
@@ -72,12 +72,18 @@
 
   <property>
     <name>chukwaAgent.collector.retryInterval</name>
-    <value>20000</value>
+    <value>500</value>
     <description>the number of milliseconds to wait between searches for a collector</description>
   </property>
 
   <property>
+    <name>chukwa.pipeline</name>
+    <value>org.apache.hadoop.chukwa.datacollection.writer.hbase.HBaseWriter</value>
+  </property>
+
+  <property>
     <name>syslog.adaptor.port.9095.facility.LOCAL1</name>
     <value>HADOOP</value>
   </property>
+
 </configuration>

Modified: chukwa/trunk/conf/hbase.schema
URL: http://svn.apache.org/viewvc/chukwa/trunk/conf/hbase.schema?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/conf/hbase.schema (original)
+++ chukwa/trunk/conf/hbase.schema Mon Jun 30 02:13:10 2014
@@ -19,25 +19,25 @@ create "Jobs",
 {NAME => "summary" }
 create "SystemMetrics", 
 {NAME => "cpu", VERSIONS => 65535},
-{NAME => "system", VERSION => 65535},
-{NAME => "disk", VERSION => 65535},
-{NAME => "memory", VERSION => 65535},
-{NAME => "swap", VERSION => 65535},
-{NAME => "network", VERSION => 65535},
-{NAME => "tags", VERSION => 65535}
+{NAME => "system", VERSIONS => 65535},
+{NAME => "disk", VERSIONS => 65535},
+{NAME => "memory", VERSIONS => 65535},
+{NAME => "swap", VERSIONS => 65535},
+{NAME => "network", VERSIONS => 65535},
+{NAME => "tags", VERSIONS => 65535}
 create "ClusterSummary", 
 {NAME=> "cpu", VERSIONS => 65535},
-{NAME => "system", VERSION => 65535},
-{NAME => "disk", VERSION => 65535},
-{NAME => "memory", VERSION => 65535},
-{NAME => "network", VERSION => 65535},
-{NAME => "swap", VERSION => 65535},
-{NAME => "hdfs", VERSION => 65535},
-{NAME => "mapreduce", VERSION => 65535}
+{NAME => "system", VERSIONS => 65535},
+{NAME => "disk", VERSIONS => 65535},
+{NAME => "memory", VERSIONS => 65535},
+{NAME => "network", VERSIONS => 65535},
+{NAME => "swap", VERSIONS => 65535},
+{NAME => "hdfs", VERSIONS => 65535},
+{NAME => "mapreduce", VERSIONS => 65535}
 create "chukwa", 
 {NAME=>"chukwaAgent_chunkQueue", VERSIONS => 65535},
-{NAME => "chukwaAgent_metrics", VERSION => 65535},
-{NAME => "chukwaAgent_httpSender", VERSION => 65535}
+{NAME => "chukwaAgent_metrics", VERSIONS => 65535},
+{NAME => "chukwaAgent_httpSender", VERSIONS => 65535}
 create "HBase",
 {NAME => "master", VERSIONS => 65535},
 {NAME => "regionserver", VERSIONS => 65535}

Modified: chukwa/trunk/pom.xml
URL: http://svn.apache.org/viewvc/chukwa/trunk/pom.xml?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/pom.xml (original)
+++ chukwa/trunk/pom.xml Mon Jun 30 02:13:10 2014
@@ -28,7 +28,7 @@
         <package.pid.dir>/var/run/chukwa</package.pid.dir>
         <package.release>1</package.release>
         <package.version>0.6.0</package.version>
-        <final.name>${project.artifactId}-incubating-${package.version}</final.name>
+        <final.name>${project.artifactId}-${package.version}</final.name>
         <test.build.dir>${basedir}/target/test</test.build.dir>
         <test.build.data>${test.build.dir}/data</test.build.data>
         <test.cache.data>${test.build.dir}/cache</test.cache.data>
@@ -118,11 +118,9 @@
             <systemPath>${basedir}/lib/confspellcheck.jar</systemPath>
           </dependency>
           <dependency>
-            <groupId>com.mdimension</groupId>
+            <groupId>com.rubiconproject.oss</groupId>
             <artifactId>jchronic</artifactId>
-            <version>0.2.3</version>
-            <scope>system</scope>
-            <systemPath>${basedir}/lib/jchronic-0.2.3.jar</systemPath>
+            <version>0.2.6</version>
           </dependency>
           <dependency>
             <groupId>org.apache.log4j</groupId>
@@ -132,11 +130,9 @@
             <systemPath>${basedir}/lib/NagiosAppender-1.5.0.jar</systemPath>
           </dependency>
           <dependency>
-            <groupId>org.hyperic</groupId>
+            <groupId>org.fusesource</groupId>
             <artifactId>sigar</artifactId>
             <version>1.6.4</version>
-            <scope>system</scope>
-            <systemPath>${basedir}/lib/sigar.jar</systemPath>
           </dependency>
           <dependency>
             <groupId>org.json</groupId>
@@ -271,7 +267,7 @@
           <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
-            <version>10.0.1</version>
+            <version>12.0.1</version>
             <exclusions>
               <exclusion>
                 <groupId>com.google.code.findbugs</groupId>
@@ -403,29 +399,6 @@
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-antrun-plugin</artifactId>
-                <version>1.6</version>
-                <configuration>
-                    <encoding>UTF-8</encoding>
-                </configuration>
-                <executions>
-                        <execution>
-                        <id>chmod-jmx-file</id>
-                        <phase>process-resources</phase>
-                        <configuration>
-                            <tasks name="setup">
-                                <chmod file="target/conf/jmxremote.password" perm="600" />
-                                <chmod file="target/conf/jmxremote.access" perm="600" />
-                            </tasks>
-                        </configuration>
-                        <goals>
-                            <goal>run</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-jar-plugin</artifactId>
                 <version>2.3.1</version>
                 <executions>
@@ -462,6 +435,7 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
                 <executions>
                   <execution>
                     <id>default-compile</id>
@@ -470,7 +444,9 @@
                       <goals>
                         <goal>compile</goal>
                       </goals>
-                      <compilerVersion>1.7</compilerVersion>
+                      <!--<compilerArgument>-Xlint:unchecked</compilerArgument>
+                      <compilerArgument>-Xlint:deprecation</compilerArgument>-->
+                      <compilerVersion>1.6</compilerVersion>
                       <source>1.6</source>
                       <target>1.6</target>
                       <excludes>
@@ -485,6 +461,8 @@
                       <goals>
                         <goal>testCompile</goal>
                       </goals>
+                      <!--<compilerArgument>-Xlint:unchecked</compilerArgument>
+                      <compilerArgument>-Xlint:deprecation</compilerArgument>-->
                       <compilerVersion>1.7</compilerVersion>
                       <source>1.6</source>
                       <target>1.6</target>
@@ -514,7 +492,7 @@
                 <artifactId>maven-surefire-plugin</artifactId>
                 <version>2.10</version>
                 <configuration>
-                    <skip>true</skip>
+                    <skip>false</skip>
                 </configuration>
                 <executions>
                     <execution>
@@ -524,7 +502,6 @@
                             <goal>test</goal>
                         </goals>
                         <configuration>
-                            <skip>false</skip>
                             <argLine>-Xmx1024m -Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=true -Dcom.sun.management.jmxremote.password.file=${basedir}/target/conf/jmxremote.password -Dcom.sun.management.jmxremote.access.file=${basedir}/target/conf/jmxremote.access -Dcom.sun.management.jmxremote.port=10100</argLine>
                             <reportsDirectory>${project.build.directory}/test-reports</reportsDirectory>
                             <forkMode>pertest</forkMode>
@@ -606,6 +583,7 @@
                     <phase>package</phase>
                     <warName>hicc</warName>
                     <warSourceDirectory>src/main/web/hicc</warSourceDirectory>
+                    <webappDirectory>${project.build.directory}/hicc</webappDirectory>
                 </configuration>
                 <executions>
                     <execution>
@@ -645,12 +623,12 @@
                         <id>setup</id>
                         <phase>validate</phase>
                         <configuration>
-                            <tasks name="setup">
+                            <target>
                                 <mkdir dir="${basedir}/target"/>
                                 <echo message="${VERSION}" file="${basedir}/target/VERSION"/>
                                 <mkdir dir="${basedir}/target/clover"/>
                                 <chmod dir="${basedir}/target/clover" perm="a+w" />
-                            </tasks>
+                            </target>
                         </configuration>
                         <goals>
                             <goal>run</goal>
@@ -660,10 +638,10 @@
                         <id>chmod-jmx-file</id>
                         <phase>process-resources</phase>
                         <configuration>
-                            <tasks name="setup">
+                            <target>
                                 <chmod file="target/conf/jmxremote.password" perm="600" />
                                 <chmod file="target/conf/jmxremote.access" perm="600" />
-                            </tasks>
+                            </target>
                         </configuration>
                         <goals>
                             <goal>run</goal>
@@ -673,7 +651,7 @@
                         <id>test-setup</id>
                         <phase>generate-test-resources</phase>
                         <configuration>
-                            <tasks name="test-setup">
+                            <target>
                                 <delete dir="/tmp/chukwa/hicc" />
                                 <mkdir dir="${test.build.dir}/var" />
                                 <mkdir dir="${test.build.dir}/var/log" />
@@ -709,12 +687,11 @@
                                         <expandproperties/>
                                     </filterchain>
                                 </copy>
-                                <copy file="${basedir}/src/test/resources/hbase-site.xml" tofile="${test.build.dir}/classes/hbase-site.xml"></copy>
                                 <copy file="${basedir}/conf/log4j.properties" tofile="${test.build.dir}/conf/log4j.properties"></copy>
                                 <copy file="${basedir}/conf/auth.conf" tofile="${test.build.dir}/conf/auth.conf"></copy>
                                 <chmod file="${test.build.dir}/conf/jmxremote.password" perm="600" />
                                 <chmod file="${test.build.dir}/conf/jmxremote.access" perm="600" />
-                            </tasks>
+                            </target>
                         </configuration>
                         <goals>
                             <goal>run</goal>
@@ -861,7 +838,7 @@
                                     <goal>single</goal>
                                 </goals>
                                 <configuration>
-                                    <finalName>chukwa-incubating-src-${VERSION}</finalName>
+                                    <finalName>chukwa-src-${VERSION}</finalName>
                                     <tarLongFileMode>gnu</tarLongFileMode>
                                     <descriptors>
                                         <descriptor>src/packages/tarball/src.xml</descriptor>
@@ -884,12 +861,11 @@
             <id>hbase-0.96</id>
             <activation>
                 <property>
-                    <name>hbase.profile</name>
-                    <value>0.96</value>
+                    <name>!hbase.profile</name>
                 </property>
             </activation>
             <properties>
-                <hbase.version>0.96.1.1-hadoop1</hbase.version>
+                <hbase.version>0.96.2-hadoop1</hbase.version>
             </properties>
             <dependencies>
                 <dependency>
@@ -982,15 +958,24 @@
                         </exclusion>
                    </exclusions>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.hbase</groupId>
+                    <artifactId>hbase-hadoop-compat</artifactId>
+                    <classifier>tests</classifier>
+                    <version>${hbase.version}</version>
+                    <scope>test</scope>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hbase</groupId>
+                    <artifactId>hbase-hadoop1-compat</artifactId>
+                    <classifier>tests</classifier>
+                    <version>${hbase.version}</version>
+                    <scope>test</scope>
+                </dependency>
             </dependencies>
         </profile>
         <profile>
             <id>hbase-0.94</id>
-            <activation>
-            <property>
-                <name>!hbase.profile</name>
-            </property>
-            </activation>
             <properties>
                 <hbase.version>0.94.9</hbase.version>
             </properties>
@@ -1082,21 +1067,10 @@
 
     <repositories>
         <repository>
-            <id>maven2-repository.dev.java.net</id>
-            <name>Java.net Repository for Maven</name>
-            <url>http://download.java.net/maven/2/</url>
-            <layout>default</layout>
-        </repository>
-        <repository>
             <id>codehaus</id>
             <url>http://repository.codehaus.org/</url>
         </repository>
         <repository>
-            <id>Sonatype-public</id>
-            <name>SnakeYAML repository</name>
-            <url>http://oss.sonatype.org/content/groups/public/</url>
-        </repository>
-        <repository>
             <id>clojars</id>
             <url>http://clojars.org/repo/</url>
         </repository>
@@ -1137,7 +1111,6 @@
             <artifactId>findbugs-maven-plugin</artifactId>
             <version>2.3.3</version>
             <configuration>
-<!--                <onlyAnalyze>org.apache.hadoop.chukwa.datacollection.*</onlyAnalyze> -->
                 <threshold>Normal</threshold>
                 <effort>Max</effort>
             </configuration>

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Heatmap.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Heatmap.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Heatmap.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Heatmap.java Mon Jun 30 02:13:10 2014
@@ -18,16 +18,11 @@
  
 package org.apache.hadoop.chukwa.analysis.salsa.visualization;
 
-import prefuse.data.io.sql.*;
-import prefuse.data.expression.parser.*;
-import prefuse.data.expression.*;
-import prefuse.data.column.*;
-import prefuse.data.query.*;
+
 import prefuse.data.*;
 import prefuse.action.*;
 import prefuse.action.layout.*;
 import prefuse.action.assignment.*;
-import prefuse.visual.expression.*;
 import prefuse.visual.*;
 import prefuse.render.*;
 import prefuse.util.*;

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java Mon Jun 30 02:13:10 2014
@@ -383,7 +383,7 @@ public class Swimlanes {
       VisualItem item = null;
       SwimlanesStatePalette pal = new SwimlanesStatePalette();
       
-      Iterator curr_group_items = this.m_vis.items(this.m_group);
+      Iterator<?> curr_group_items = this.m_vis.items(this.m_group);
           
       while (curr_group_items.hasNext()) {
         item = (VisualItem) curr_group_items.next();

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SystemMetrics.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SystemMetrics.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SystemMetrics.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SystemMetrics.java Mon Jun 30 02:13:10 2014
@@ -41,7 +41,7 @@ import org.apache.log4j.Logger;
 
 public class SystemMetrics extends AbstractAdaptor {
   static Logger log = Logger.getLogger(SystemMetrics.class);
-  private long period = 60 * 1000;
+  private long period = 5 * 1000;
   private SigarRunner runner;
   private Timer timer;
   
@@ -50,14 +50,17 @@ public class SystemMetrics extends Abstr
     int spOffset = args.indexOf(' ');
     if (spOffset > 0) {
       try {
-        period = Integer.parseInt(args.substring(0, spOffset));
+        period = Long.parseLong(args.substring(0, spOffset));
         period = period * 1000;
+        start(spOffset);
       } catch (NumberFormatException e) {
         StringBuilder buffer = new StringBuilder();
         buffer.append("SystemMetrics: sample interval ");
         buffer.append(args.substring(0, spOffset));
         buffer.append(" can't be parsed.");
         log.warn(buffer.toString());
+      } catch (AdaptorException e) {
+        log.warn("Error parsing parameter for SystemMetrics adaptor.");
       }
     }    
     return args;

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Mon Jun 30 02:13:10 2014
@@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHa
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.datacollection.DataFactory;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
@@ -45,6 +46,7 @@ import org.apache.hadoop.chukwa.datacoll
 import org.apache.hadoop.chukwa.datacollection.agent.metrics.AgentMetrics;
 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
+import org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector;
 import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
 import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
@@ -52,13 +54,14 @@ import org.apache.hadoop.chukwa.util.Exc
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
-
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.servlet.Context;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.mortbay.jetty.nio.SelectChannelConnector;
 import org.mortbay.thread.BoundedThreadPool;
+
 import com.sun.jersey.spi.container.servlet.ServletContainer;
+
 import edu.berkeley.confspell.*;
 
 /**
@@ -79,21 +82,129 @@ public class ChukwaAgent implements Adap
   // boolean WRITE_CHECKPOINTS = true;
   static AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "metrics");
 
+  private static Logger log = Logger.getLogger(ChukwaAgent.class);
   private static final int HTTP_SERVER_THREADS = 120;
   private static Server jettyServer = null;
   private OffsetStatsManager adaptorStatsManager = null;
   private Timer statsCollector = null;
+  private static volatile Configuration conf = null;
+  private static volatile ChukwaAgent agent = null;
+  public Connector connector = null;
+  
+  protected ChukwaAgent() throws AlreadyRunningException {
+    this(new ChukwaConfiguration());
+  }
+
+  public ChukwaAgent(Configuration conf) throws AlreadyRunningException {
+    ChukwaAgent.agent = this;
+    this.conf = conf;
+    
+    // almost always just reading this; so use a ConcurrentHM.
+    // since we wrapped the offset, it's not a structural mod.
+    adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
+    adaptorsByName = new HashMap<String, Adaptor>();
+    checkpointNumber = 0;
+
+    boolean DO_CHECKPOINT_RESTORE = conf.getBoolean(
+        "chukwaAgent.checkpoint.enabled", true);
+    CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
+        "chukwa_checkpoint_");
+    final int CHECKPOINT_INTERVAL_MS = conf.getInt(
+        "chukwaAgent.checkpoint.interval", 5000);
+    final int STATS_INTERVAL_MS = conf.getInt(
+        "chukwaAgent.stats.collection.interval", 10000);
+    final int STATS_DATA_TTL_MS = conf.getInt(
+        "chukwaAgent.stats.data.ttl", 1200000);
+
+    if (conf.get("chukwaAgent.checkpoint.dir") != null)
+      checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
+    else
+      DO_CHECKPOINT_RESTORE = false;
+
+    if (checkpointDir != null && !checkpointDir.exists()) {
+      checkpointDir.mkdirs();
+    }
+    tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
+    DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown_cluster\""));
+
+    log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
+    log.info("Config - checkpointDir: [" + checkpointDir + "]");
+    log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
+        + "]");
+    log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
+    log.info("Config - STATS_INTERVAL_MS: [" + STATS_INTERVAL_MS + "]");
+    log.info("Config - tags: [" + tags + "]");
+
+    if (DO_CHECKPOINT_RESTORE) {
+      log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
+    }
+
+    File initialAdaptors = null;
+    if (conf.get("chukwaAgent.initial_adaptors") != null)
+      initialAdaptors = new File(conf.get("chukwaAgent.initial_adaptors"));
+
+    try {
+      if (DO_CHECKPOINT_RESTORE) {
+        restoreFromCheckpoint();
+      }
+    } catch (IOException e) {
+      log.warn("failed to restart from checkpoint: ", e);
+    }
+
+    try {
+      if (initialAdaptors != null && initialAdaptors.exists())
+        readAdaptorsFile(initialAdaptors); 
+    } catch (IOException e) {
+      log.warn("couldn't read user-specified file "
+          + initialAdaptors.getAbsolutePath());
+    }
+
+    controlSock = new AgentControlSocketListener(this);
+    try {
+      controlSock.tryToBind(); // do this synchronously; if it fails, we know
+      // another agent is running.
+      controlSock.start(); // this sets us up as a daemon
+      log.info("control socket started on port " + controlSock.portno);
+
+      // start the HTTP server with stats collection
+      try {
+        this.adaptorStatsManager = new OffsetStatsManager(STATS_DATA_TTL_MS);
+        this.statsCollector = new Timer("ChukwaAgent Stats Collector");
+
+        startHttpServer(conf);
+
+        statsCollector.scheduleAtFixedRate(new StatsCollectorTask(),
+                STATS_INTERVAL_MS, STATS_INTERVAL_MS);
+      } catch (Exception e) {
+        log.error("Couldn't start HTTP server", e);
+        throw new RuntimeException(e);
+      }
+
+      // shouldn't start checkpointing until we're finishing launching
+      // adaptors on boot
+      if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir != null) {
+        checkpointer = new Timer();
+        checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
+      }
+    } catch (IOException e) {
+      log.info("failed to bind to socket; aborting agent launch", e);
+      throw new AlreadyRunningException();
+    }
 
-  static Logger log = Logger.getLogger(ChukwaAgent.class);
-  static ChukwaAgent agent = null;
+  }
 
   public static ChukwaAgent getAgent() {
+    if(agent == null) {
+      try {
+        agent = new ChukwaAgent();
+      } catch(AlreadyRunningException e) {
+        log.error("Chukwa Agent is already running", e);
+        agent = null;
+      }
+    } 
     return agent;
   }
 
-  static Configuration conf = null;
-  Connector connector = null;
-
   // doesn't need an equals(), comparator, etc
   public static class Offset {
     public Offset(long l, String id) {
@@ -177,21 +288,24 @@ public class ChukwaAgent implements Adap
 
       int uriArgNumber = 0;
       if (args.length > 0) {
-        if (args[uriArgNumber].equals("local"))
+        if (args[uriArgNumber].equals("local")) {
           agent.connector = new ConsoleOutConnector(agent);
-        else {
-          if (!args[uriArgNumber].contains("://"))
+        } else {
+          if (!args[uriArgNumber].contains("://")) {
             args[uriArgNumber] = "http://" + args[uriArgNumber];
+          }
           agent.connector = new HttpConnector(agent, args[uriArgNumber]);
         }
-      } else
-        agent.connector = new HttpConnector(agent);
-
+      } else {
+        String connectorType = conf.get("chukwa.agent.connector", 
+            "org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector");
+        agent.connector = (Connector) Class.forName(connectorType).newInstance();
+      }
       agent.connector.start();
 
       log.info("local agent started on port " + agent.getControlSock().portno);
-      System.out.close();
-      System.err.close();
+      //System.out.close();
+      //System.err.close();
     } catch (AlreadyRunningException e) {
       log.error("agent started already on this machine with same portno;"
           + " bailing out");
@@ -219,108 +333,6 @@ public class ChukwaAgent implements Adap
     }
   }
 
-  public ChukwaAgent() throws AlreadyRunningException {
-    this(new Configuration());
-  }
-
-  public ChukwaAgent(Configuration conf) throws AlreadyRunningException {
-    ChukwaAgent.agent = this;
-    this.conf = conf;
-    
-    // almost always just reading this; so use a ConcurrentHM.
-    // since we wrapped the offset, it's not a structural mod.
-    adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
-    adaptorsByName = new HashMap<String, Adaptor>();
-    checkpointNumber = 0;
-
-    boolean DO_CHECKPOINT_RESTORE = conf.getBoolean(
-        "chukwaAgent.checkpoint.enabled", true);
-    CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
-        "chukwa_checkpoint_");
-    final int CHECKPOINT_INTERVAL_MS = conf.getInt(
-        "chukwaAgent.checkpoint.interval", 5000);
-    final int STATS_INTERVAL_MS = conf.getInt(
-        "chukwaAgent.stats.collection.interval", 10000);
-    final int STATS_DATA_TTL_MS = conf.getInt(
-        "chukwaAgent.stats.data.ttl", 1200000);
-
-    if (conf.get("chukwaAgent.checkpoint.dir") != null)
-      checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
-    else
-      DO_CHECKPOINT_RESTORE = false;
-
-    if (checkpointDir != null && !checkpointDir.exists()) {
-      checkpointDir.mkdirs();
-    }
-    tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
-    DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown_cluster\""));
-
-    log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
-    log.info("Config - checkpointDir: [" + checkpointDir + "]");
-    log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
-        + "]");
-    log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
-    log.info("Config - STATS_INTERVAL_MS: [" + STATS_INTERVAL_MS + "]");
-    log.info("Config - tags: [" + tags + "]");
-
-    if (DO_CHECKPOINT_RESTORE) {
-      log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
-    }
-
-    File initialAdaptors = null;
-    if (conf.get("chukwaAgent.initial_adaptors") != null)
-      initialAdaptors = new File(conf.get("chukwaAgent.initial_adaptors"));
-
-    try {
-      if (DO_CHECKPOINT_RESTORE) {
-        restoreFromCheckpoint();
-      }
-    } catch (IOException e) {
-      log.warn("failed to restart from checkpoint: ", e);
-    }
-
-    try {
-      if (initialAdaptors != null && initialAdaptors.exists())
-        readAdaptorsFile(initialAdaptors); 
-    } catch (IOException e) {
-      log.warn("couldn't read user-specified file "
-          + initialAdaptors.getAbsolutePath());
-    }
-
-    controlSock = new AgentControlSocketListener(this);
-    try {
-      controlSock.tryToBind(); // do this synchronously; if it fails, we know
-      // another agent is running.
-      controlSock.start(); // this sets us up as a daemon
-      log.info("control socket started on port " + controlSock.portno);
-
-      // start the HTTP server with stats collection
-      try {
-        this.adaptorStatsManager = new OffsetStatsManager(STATS_DATA_TTL_MS);
-        this.statsCollector = new Timer("ChukwaAgent Stats Collector");
-
-        startHttpServer(conf);
-
-        statsCollector.scheduleAtFixedRate(new StatsCollectorTask(),
-                STATS_INTERVAL_MS, STATS_INTERVAL_MS);
-      } catch (Exception e) {
-        log.error("Couldn't start HTTP server", e);
-        throw new RuntimeException(e);
-      }
-
-      // shouldn't start checkpointing until we're finishing launching
-      // adaptors on boot
-      if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir != null) {
-        checkpointer = new Timer();
-        checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
-      }
-    } catch (IOException e) {
-      log.info("failed to bind to socket; aborting agent launch", e);
-      throw new AlreadyRunningException();
-    }
-
-  }
-
   private void startHttpServer(Configuration conf) throws Exception {
     int portNum = conf.getInt("chukwaAgent.http.port", 9090);
     String jaxRsAddlPackages = conf.get("chukwaAgent.http.rest.controller.packages");
@@ -716,7 +728,7 @@ public class ChukwaAgent implements Adap
     return o;
   }
   
-  Connector getConnector() {
+  public Connector getConnector() {
     return connector;
   }
 

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java Mon Jun 30 02:13:10 2014
@@ -36,6 +36,7 @@ import javax.servlet.http.HttpServlet;
 import java.io.File;
 import java.util.*;
 
+@Deprecated
 public class CollectorStub {
 
   static int THREADS = 120;

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java Mon Jun 30 02:13:10 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.chukwa.extracti
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 
+@Deprecated
 public class CommitCheckServlet extends HttpServlet {
 
   private static final long serialVersionUID = -4627538252371890849L;

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java Mon Jun 30 02:13:10 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.chukwa.*;
 import org.apache.hadoop.chukwa.datacollection.writer.ExtractorWriter;
 import org.apache.hadoop.conf.Configuration;
 
+@Deprecated
 public class LogDisplayServlet extends HttpServlet {
   
   /*

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java Mon Jun 30 02:13:10 2014
@@ -46,6 +46,7 @@ import org.apache.hadoop.io.compress.Com
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.log4j.Logger;
 
+@Deprecated
 public class ServletCollector extends HttpServlet {
 
   static final boolean FANCY_DIAGNOSTICS = false;

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java Mon Jun 30 02:13:10 2014
@@ -26,6 +26,7 @@ import java.util.*;
 /**
  * One per post
  */
+@Deprecated
 public class ServletDiagnostics {
 
   static Logger log = Logger.getLogger(ServletDiagnostics.class);

Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java?rev=1606617&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java (added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java Mon Jun 30 02:13:10 2014
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.connector;
+
+/**
+ * This class is responsible for setting up connections with configured
+ * storage writers base on configuration of chukwa_agent.xml.
+ * 
+ * On error, tries the list of available storage writers, pauses for a minute, 
+ * and then repeats.
+ *
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.Connector;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
+import org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
+import org.apache.hadoop.chukwa.util.DaemonWatcher;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+public class PipelineConnector implements Connector, Runnable {
+
+  static Logger log = Logger.getLogger(PipelineConnector.class);
+
+  Timer statTimer = null;
+  volatile int chunkCount = 0;
+  
+  int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
+  int MIN_POST_INTERVAL = 5 * 1000;
+  public static final String MIN_POST_INTERVAL_OPT = "pipelineConnector.minPostInterval";
+  public static final String MAX_SIZE_PER_POST_OPT = "pipelineConnector.maxPostSize";
+  public static final String ASYNC_ACKS_OPT = "pipelineConnector.asyncAcks";
+
+  ChunkQueue chunkQueue;
+
+  private static volatile ChukwaAgent agent = null;
+
+  private volatile boolean stopMe = false;
+  protected ChukwaWriter writers = null;
+
+  public PipelineConnector() {
+    //instance initializer block
+    statTimer = new Timer();
+    statTimer.schedule(new TimerTask() {
+      public void run() {
+        int count = chunkCount;
+        chunkCount = 0;
+        log.info("# Data chunks sent since last report: " + count);
+      }
+    }, 100, 60 * 1000);
+  }
+  
+  public void start() {
+    chunkQueue = DataFactory.getInstance().getEventQueue();
+    agent = ChukwaAgent.getAgent();
+    Configuration conf = agent.getConfiguration();
+    MAX_SIZE_PER_POST = conf.getInt(MAX_SIZE_PER_POST_OPT, MAX_SIZE_PER_POST);
+    MIN_POST_INTERVAL = conf.getInt(MIN_POST_INTERVAL_OPT, MIN_POST_INTERVAL);
+    try {
+      writers = new PipelineStageWriter(conf);
+      (new Thread(this, "Pipeline connector thread")).start();
+    } catch(Exception e) {
+      log.error("Pipeline initialization error: ", e);
+    }
+  }
+
+  public void shutdown() {
+    stopMe = true;
+    try {
+      writers.close();
+    } catch (WriterException e) {
+      log.warn("Shutdown error: ",e);
+    }
+  }
+
+  public void run() {
+    log.info("PipelineConnector started at time:" + System.currentTimeMillis());
+
+    try {
+      long lastPost = System.currentTimeMillis();
+      while (!stopMe) {
+        List<Chunk> newQueue = new ArrayList<Chunk>();
+        try {
+          // get all ready chunks from the chunkQueue to be sent
+          chunkQueue.collect(newQueue, MAX_SIZE_PER_POST);
+        } catch (InterruptedException e) {
+          log.warn("thread interrupted during addChunks(ChunkQueue)");
+          Thread.currentThread().interrupt();
+          break;
+        }
+        CommitStatus result = writers.add(newQueue);
+        if(result.equals(ChukwaWriter.COMMIT_OK)) {
+          chunkCount = newQueue.size();
+          for (Chunk c : newQueue) {
+            agent.reportCommit(c.getInitiator(), c.getSeqID());
+          }          
+        }
+        long now = System.currentTimeMillis();
+        long delta = MIN_POST_INTERVAL - now + lastPost;
+        if(delta > 0) {
+          Thread.sleep(delta); // wait for stuff to accumulate
+        }
+        lastPost = now;
+      } // end of try forever loop
+      log.info("received stop() command so exiting run() loop to shutdown connector");
+    } catch (WriterException e) {
+      log.warn("PipelineStageWriter Exception: ", e);
+    } catch (OutOfMemoryError e) {
+      log.warn("Bailing out", e);
+      DaemonWatcher.bailout(-1);
+    } catch (InterruptedException e) {
+      // do nothing, let thread die.
+      log.warn("Bailing out", e);
+      DaemonWatcher.bailout(-1);
+    } catch (Throwable e) {
+      log.error("connector failed; shutting down agent: ", e);
+      throw new RuntimeException("Shutdown pipeline connector.");
+    }
+  }
+
+  @Override
+  public void reloadConfiguration() {
+  }
+  
+}

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java Mon Jun 30 02:13:10 2014
@@ -81,7 +81,7 @@ public class FileTailerStressTest {
       server.setStopAtShutdown(false);
 
       Thread.sleep(1000);
-      ChukwaAgent agent = new ChukwaAgent();
+      ChukwaAgent agent = ChukwaAgent.getAgent();
       HttpConnector connector = new HttpConnector(agent,
           "http://localhost:9990/chukwa");
       connector.start();

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java Mon Jun 30 02:13:10 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.chukwa.datacol
 import java.util.List;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.collector.servlet.LogDisplayServlet;
-import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
 import org.apache.hadoop.conf.Configuration;
 
 public class ExtractorWriter extends PipelineableWriter {

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java Mon Jun 30 02:13:10 2014
@@ -20,7 +20,9 @@ package org.apache.hadoop.chukwa.datacol
 
 
 import java.util.List;
+
 import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 
@@ -35,6 +37,15 @@ public class PipelineStageWriter impleme
 
   ChukwaWriter writer; // head of pipeline
 
+  public PipelineStageWriter() throws WriterException {
+    Configuration conf = new ChukwaConfiguration();
+    init(conf);
+  }
+  
+  public PipelineStageWriter(Configuration conf) throws WriterException {
+    init(conf);
+  }
+  
   @Override
   public CommitStatus add(List<Chunk> chunks) throws WriterException {
     return writer.add(chunks);
@@ -47,8 +58,8 @@ public class PipelineStageWriter impleme
 
   @Override
   public void init(Configuration conf) throws WriterException {
-    if (conf.get("chukwaCollector.pipeline") != null) {
-      String pipeline = conf.get("chukwaCollector.pipeline");
+    if (conf.get("chukwa.pipeline") != null) {
+      String pipeline = conf.get("chukwa.pipeline");
       try {
         String[] classes = pipeline.split(",");
         log.info("using pipelined writers, pipe length is " + classes.length);
@@ -65,7 +76,7 @@ public class PipelineStageWriter impleme
           Object st = stageClass.newInstance();
           if (!(st instanceof PipelineableWriter))
             log.error("class " + classes[i]
-                + " in processing pipeline isn't a pipeline stage");
+                + " in processing pipeline isn't a PipelineableWriter.");
 
           PipelineableWriter stage = (PipelineableWriter) stageClass
               .newInstance();
@@ -98,7 +109,7 @@ public class PipelineStageWriter impleme
         throw new WriterException("bad pipeline");
       }
     } else {
-      throw new WriterException("must set chukwaCollector.pipeline");
+      throw new WriterException("must set chukwa.pipeline");
     }
   }
 

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java Mon Jun 30 02:13:10 2014
@@ -20,10 +20,8 @@ package org.apache.hadoop.chukwa.datacol
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ArrayBlockingQueue;
-import java.util.regex.PatternSyntaxException;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.util.Filter;
-import org.apache.hadoop.chukwa.util.RegexUtil;
 import org.apache.hadoop.chukwa.util.RegexUtil.CheckedPatternSyntaxException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
@@ -80,8 +78,8 @@ public class SocketTeeWriter extends Pip
   class SocketListenThread extends Thread {
     ServerSocket s;
     public SocketListenThread(Configuration conf) throws IOException {
-      int portno = conf.getInt("chukwaCollector.tee.port", DEFAULT_PORT);
-      USE_KEEPALIVE = conf.getBoolean("chukwaCollector.tee.keepalive", true);
+      int portno = conf.getInt("chukwa.tee.port", DEFAULT_PORT);
+      USE_KEEPALIVE = conf.getBoolean("chukwa.tee.keepalive", true);
       s = new ServerSocket(portno);
       setDaemon(true);
     }

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java Mon Jun 30 02:13:10 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.datacollection.writer.hbase;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -38,8 +39,12 @@ import org.apache.hadoop.chukwa.util.Dae
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
@@ -55,7 +60,7 @@ public class HBaseWriter extends Pipelin
   private Reporter reporter;
   private ChukwaConfiguration conf;
   String defaultProcessor;
-  private HTablePool pool;
+  private HConnection connection;
   private Configuration hconf;
   
   private class StatReportingTask extends TimerTask {
@@ -77,20 +82,20 @@ public class HBaseWriter extends Pipelin
     }
   };
 
-  public HBaseWriter() {
+  public HBaseWriter() throws IOException {
     this(true);
   }
 
-  public HBaseWriter(boolean reportStats) {
+  public HBaseWriter(boolean reportStats) throws IOException {
     /* HBase Version >= 0.89.x */
     this(reportStats, new ChukwaConfiguration(), HBaseConfiguration.create());
   }
 
-  public HBaseWriter(ChukwaConfiguration conf, Configuration hconf) {
+  public HBaseWriter(ChukwaConfiguration conf, Configuration hconf) throws IOException {
     this(true, conf, hconf);
   }
 
-  private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) {
+  private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) throws IOException {
     this.reportStats = reportStats;
     this.conf = conf;
     this.hconf = hconf;
@@ -99,7 +104,16 @@ public class HBaseWriter extends Pipelin
       "chukwa.demux.mapper.default.processor",
       "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
     Demux.jobConf = conf;
-    log.info("hbase.zookeeper.quorum: " + hconf.get("hbase.zookeeper.quorum"));
+    log.info("hbase.zookeeper.quorum: " + hconf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + hconf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
+    if (reportStats) {
+      statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
+    }
+    output = new OutputCollector();
+    reporter = new Reporter();
+    if(conf.getBoolean("hbase.writer.verify.schema", false)) {
+      verifyHbaseSchema();      
+    }
+    connection = HConnectionManager.createConnection(hconf);
   }
 
   public void close() {
@@ -109,15 +123,6 @@ public class HBaseWriter extends Pipelin
   }
 
   public void init(Configuration conf) throws WriterException {
-    if (reportStats) {
-      statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
-    }
-    output = new OutputCollector();
-    reporter = new Reporter();
-    if(conf.getBoolean("hbase.writer.verify.schema", false)) {
-      verifyHbaseSchema();      
-    }
-    pool = new HTablePool(hconf, 60);
   }
 
   private boolean verifyHbaseTable(HBaseAdmin admin, Table table) {
@@ -188,12 +193,12 @@ public class HBaseWriter extends Pipelin
             Table table = findHBaseTable(chunk.getDataType());
 
             if(table!=null) {
-              HTableInterface hbase = pool.getTable(table.name().getBytes());
+              HTableInterface hbase = connection.getTable(table.name());              
               MapProcessor processor = getProcessor(chunk.getDataType());
               processor.process(new ChukwaArchiveKey(), chunk, output, reporter);
-
               hbase.put(output.getKeyValues());
-              pool.putTable(hbase);
+            } else {
+              log.warn("Error finding HBase table for data type:"+chunk.getDataType());
             }
           } catch (Exception e) {
             log.warn(output.getKeyValues());

Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java Mon Jun 30 02:13:10 2014
@@ -43,8 +43,8 @@ public class TestOffsetStatsManager exte
 
     // calculate 5 second average
     double rate = statsManager.calcAverageRate(dummyKey, 5);
-    assertTrue("Invalid average, expected about 1 byte/sec, found " + rate,
-                 Math.abs(1000 - rate) < 1.5);
+    assertTrue("Invalid average, expected about 1 kbyte/sec, found " + rate,
+                 Math.abs(rate / 1000) <= 1);
   }
 
   public void testCalcAverageRateStaleData() throws InterruptedException {

Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSyslogAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSyslogAdaptor.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSyslogAdaptor.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSyslogAdaptor.java Mon Jun 30 02:13:10 2014
@@ -42,7 +42,7 @@ public class TestSyslogAdaptor extends T
     DatagramPacket p = new DatagramPacket(buf, buf.length);
     p.setSocketAddress(new InetSocketAddress("127.0.0.1",u.portno));
     send.send(p);
-    
+    send.close();
     synchronized(this) {
       wait(1000);
     }

Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java Mon Jun 30 02:13:10 2014
@@ -40,7 +40,7 @@ public class TestLogRotate extends TestC
 
   public void testLogRotate() throws IOException, InterruptedException,
       ChukwaAgent.AlreadyRunningException {
-    ChukwaAgent agent = new ChukwaAgent();
+    ChukwaAgent agent = ChukwaAgent.getAgent();
     // Remove any adaptor left over from previous run
     ChukwaConfiguration cc = new ChukwaConfiguration();
     int portno = cc.getInt("chukwaAgent.control.port", 9093);

Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java Mon Jun 30 02:13:10 2014
@@ -21,33 +21,62 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.Map;
+
 import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
 import org.apache.hadoop.conf.Configuration;
+
 import junit.framework.TestCase;
+
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
 import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
 import org.apache.log4j.Level;
+import org.mortbay.log.Log;
 
 public class TestRCheckAdaptor extends TestCase implements ChunkReceiver {
   
   ChunkCatcherConnector chunks;
+  ChukwaAgent agent;
 
   public TestRCheckAdaptor() {
     chunks = new ChunkCatcherConnector();
     chunks.start();
   }
 
+  @Override
+  protected void setUp() throws InterruptedException {
+    Configuration conf = new ChukwaConfiguration();
+    conf.set("", "org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector");
+    try {
+      agent = new ChukwaAgent(conf);
+      Thread.sleep(2000);
+      Map<String, String> adaptorList = agent.getAdaptorList();
+      for(String id : adaptorList.keySet()) {
+        agent.stopAdaptor(id, false);
+      }
+    } catch (AlreadyRunningException e) {
+      fail("Agent is already running.");
+    }    
+  }
+  
+  @Override
+  protected void tearDown() throws InterruptedException {
+    agent.shutdown();
+    Thread.sleep(2000);
+  }
+  
   public void testBaseCases() throws IOException, InterruptedException,
       ChukwaAgent.AlreadyRunningException {
     Configuration conf = new Configuration();
     conf.set("chukwaAgent.control.port", "0");
     conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
         
-//    RCheckFTAdaptor.log.setLevel(Level.DEBUG);
     File baseDir = new File(System.getProperty("test.build.data", "/tmp") + "/rcheck");
     TestDirTailingAdaptor.createEmptyDir(baseDir);
     File tmpOutput = new File(baseDir, "rotateTest.1");
@@ -60,16 +89,14 @@ public class TestRCheckAdaptor extends T
     pw.println("Second");
     pw.close();
     
-    
-    ChukwaAgent agent = new ChukwaAgent(conf);
     String adaptorID = agent.processAddCommand("add lr = filetailer.RCheckFTAdaptor test " + tmpOutput.getAbsolutePath() + " 0");
     assertNotNull(adaptorID);
     
-    Chunk c = chunks.waitForAChunk(2000);
+    Chunk c = chunks.waitForAChunk();
     assertNotNull(c);
     assertTrue(c.getData().length == 6);
     assertTrue("First\n".equals(new String(c.getData())));
-    c = chunks.waitForAChunk(2000);
+    c = chunks.waitForAChunk();
     assertNotNull(c);
     assertTrue(c.getData().length == 7);    
     assertTrue("Second\n".equals(new String(c.getData())));
@@ -77,7 +104,7 @@ public class TestRCheckAdaptor extends T
     pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
     pw.println("Third");
     pw.close();
-    c = chunks.waitForAChunk(2000);
+    c = chunks.waitForAChunk();
     
     assertNotNull(c);
     assertTrue(c.getData().length == 6);    
@@ -88,7 +115,7 @@ public class TestRCheckAdaptor extends T
     pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
     pw.println("Fourth");
     pw.close();
-    c = chunks.waitForAChunk(2000);
+    c = chunks.waitForAChunk();
 
     assertNotNull(c);
     System.out.println("got " + new String(c.getData()));
@@ -101,22 +128,19 @@ public class TestRCheckAdaptor extends T
     pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
     pw.println("Fifth");
     pw.close();
-    c = chunks.waitForAChunk(2000);
+    c = chunks.waitForAChunk();
     assertNotNull(c);
     System.out.println("got " + new String(c.getData()));
     assertTrue("Fifth\n".equals(new String(c.getData())));
 
-    agent.shutdown();
-    Thread.sleep(2000);
   }
   
-  
   public void testContinuously() throws Exception {
     File baseDir = new File(System.getProperty("test.build.data", "/tmp") + "/rcheck");
     TestDirTailingAdaptor.createEmptyDir(baseDir);
     File tmpOutput = new File(baseDir, "continuousTest");
     PrintWriter pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
-    LWFTAdaptor.tailer.SAMPLE_PERIOD_MS = 2000;
+    //LWFTAdaptor.tailer.SAMPLE_PERIOD_MS = 2000;
 
 //    RCheckFTAdaptor.log.setLevel(Level.DEBUG);
     RCheckFTAdaptor rca = new RCheckFTAdaptor();

Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java Mon Jun 30 02:13:10 2014
@@ -49,7 +49,7 @@ public class TestAdaptorController exten
   StringBuilder sb;
 
   protected void setUp() throws Exception {
-    agent = new ChukwaAgent();
+    agent = ChukwaAgent.getAgent();
 
     ServletHolder servletHolder = new ServletHolder(ServletContainer.class);
     servletHolder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",

Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java Mon Jun 30 02:13:10 2014
@@ -31,7 +31,7 @@ public class TestFailedCollector extends
 
   public void testFailedCollector() {
     try {
-      ChukwaAgent agent = new ChukwaAgent();
+      ChukwaAgent agent = ChukwaAgent.getAgent();
       boolean failed = false;
       HttpConnector connector = new HttpConnector(agent,
           "http://localhost:1234/chukwa");

Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestHBaseWriter.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestHBaseWriter.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestHBaseWriter.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestHBaseWriter.java Mon Jun 30 02:13:10 2014
@@ -18,6 +18,7 @@
 package org.apache.hadoop.chukwa.datacollection.writer;
 
 
+import java.io.IOException;
 import java.util.ArrayList;
 
 import junit.framework.Assert;
@@ -28,11 +29,11 @@ import org.apache.hadoop.chukwa.ChunkImp
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.datacollection.writer.hbase.HBaseWriter;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -52,36 +53,33 @@ public class TestHBaseWriter extends Tes
   private byte[] table = Bytes.toBytes("Test");
   private byte[] test = Bytes.toBytes("1234567890 Key Value");
   private ChukwaConfiguration cc;
+  private LocalHBaseCluster cluster;
   long timestamp = 1234567890;
   
   public TestHBaseWriter() {
     cc = new ChukwaConfiguration();
-
-    conf = HBaseConfiguration.create();
-    conf.set("hbase.hregion.memstore.flush.size", String.valueOf(128*1024));
+  }
+  
+  public void setUp() {
     try {
-      util = new HBaseTestingUtility(conf);
+      util = new HBaseTestingUtility();
       util.startMiniZKCluster();
-      util.getConfiguration().setBoolean("dfs.support.append", true);
-      util.startMiniCluster(2);
-      HTableDescriptor desc = new HTableDescriptor();
+      util.startMiniCluster();
+      conf = util.getConfiguration();
+      HTableDescriptor desc = new HTableDescriptor(TableName.valueOf("Test"));
       HColumnDescriptor family = new HColumnDescriptor(columnFamily);
-      desc.setName(table);
       desc.addFamily(family);
       util.getHBaseAdmin().createTable(desc);
 
     } catch (Exception e) {
       e.printStackTrace();
       Assert.fail(e.getMessage());
-    }
-  }
-  
-  public void setup() {
-    
+    }    
   }
   
-  public void tearDown() {
-    
+  public void tearDown() throws Exception {
+    util.shutdownMiniCluster();
+    util.shutdownMiniZKCluster();
   }
   
   public void testWriters() {
@@ -90,7 +88,6 @@ public class TestHBaseWriter extends Tes
     try {      
       cc.set("hbase.demux.package", "org.apache.chukwa.datacollection.writer.test.demux");
       cc.set("TextParser","org.apache.hadoop.chukwa.datacollection.writer.test.demux.TextParser");
-      conf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1");
       hbw = new HBaseWriter(cc, conf);
       hbw.init(cc);
       if(hbw.add(chunks)!=ChukwaWriter.COMMIT_OK) {
@@ -103,8 +100,8 @@ public class TestHBaseWriter extends Tes
       }
       // Cleanup and return
       scanner.close();
+      testTable.close();
       // Compare data in Hbase with generated chunks
-      util.shutdownMiniCluster();
     } catch (Exception e) {
       e.printStackTrace();
       Assert.fail(e.getMessage());

Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java Mon Jun 30 02:13:10 2014
@@ -32,11 +32,11 @@ public class TestSocketTee  extends Test
     
     Configuration conf = new Configuration();  
     
-    conf.set("chukwaCollector.pipeline",
+    conf.set("chukwa.pipeline",
         SocketTeeWriter.class.getCanonicalName()+","// note comma
         + CaptureWriter.class.getCanonicalName());
     
-    conf.set("chukwaCollector.writerClass", 
+    conf.set("chukwa.writerClass", 
         PipelineStageWriter.class.getCanonicalName());
     
     PipelineStageWriter psw = new PipelineStageWriter();

Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/dataloader/TestSocketDataLoader.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/dataloader/TestSocketDataLoader.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/dataloader/TestSocketDataLoader.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/dataloader/TestSocketDataLoader.java Mon Jun 30 02:13:10 2014
@@ -39,10 +39,10 @@ public class TestSocketDataLoader  exten
     
     Configuration conf = new Configuration();  
     
-    conf.set("chukwaCollector.pipeline",
+    conf.set("chukwa.pipeline",
         SocketTeeWriter.class.getCanonicalName());
     
-    conf.set("chukwaCollector.writerClass", 
+    conf.set("chukwa.writerClass", 
         PipelineStageWriter.class.getCanonicalName());
     
     PipelineStageWriter psw = new PipelineStageWriter();

Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/rest/resource/TestClientTrace.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/rest/resource/TestClientTrace.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/rest/resource/TestClientTrace.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/rest/resource/TestClientTrace.java Mon Jun 30 02:13:10 2014
@@ -42,8 +42,8 @@ public class TestClientTrace extends Set
         SocketTeeWriter.class.getCanonicalName());
     conf.set("chukwaCollector.writerClass", 
         PipelineStageWriter.class.getCanonicalName());    
-    PipelineStageWriter psw = new PipelineStageWriter();
     try {
+      PipelineStageWriter psw = new PipelineStageWriter();
       psw.init(conf);
       // Send a client trace chunk
       ArrayList<Chunk> l = new ArrayList<Chunk>();

Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java Mon Jun 30 02:13:10 2014
@@ -40,6 +40,8 @@ import org.apache.hadoop.io.SequenceFile
 
 public class TestBackfillingLoader extends TestCase{
 
+  private String cluster = "chukwa";
+  
   public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscaped() {
     String tmpDir = System.getProperty("test.build.data", "/tmp");
     long ts = System.currentTimeMillis();
@@ -50,7 +52,7 @@ public class TestBackfillingLoader exten
     conf.set("chukwaCollector.outputDir", dataDir  + "/log/");
     conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
     
-    String cluster = "MyCluster_" + ts;
+    String cluster = "chukwa";
     String machine = "machine_" + ts;
     String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
     String recordType = "MyRecordType_" + ts;
@@ -110,7 +112,6 @@ public class TestBackfillingLoader exten
     conf.set("chukwaCollector.outputDir", dataDir  + "/log/");
     conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
     
-    String cluster = "MyCluster_" + ts;
     String machine = "machine_" + ts;
     String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor";
     String recordType = "MyRecordType_" + ts;
@@ -173,7 +174,6 @@ public class TestBackfillingLoader exten
     conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
     
     
-    String cluster = "MyCluster_" + ts;
     String machine = "machine_" + ts;
     String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
     String recordType = "MyRecordType_" + ts;
@@ -237,7 +237,6 @@ public class TestBackfillingLoader exten
     conf.set("chukwaCollector.writerClass", "org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter");
     conf.set("chukwaCollector.minPercentFreeDisk", "2");//so unit tests pass on machines with full-ish disks
 
-    String cluster = "MyCluster_" + ts;
     String machine = "machine_" + ts;
     String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
     String recordType = "MyRecordType_" + ts;
@@ -303,6 +302,9 @@ public class TestBackfillingLoader exten
 
 
       while (reader.next(key, chunk)) {
+        System.out.println("cluster:" + cluster);
+        System.out.println("cluster:" + RecordUtil.getClusterName(chunk));
+
         Assert.assertTrue(cluster.equals(RecordUtil.getClusterName(chunk)));
         Assert.assertTrue(dataType.equals(chunk.getDataType()));
         Assert.assertTrue(source.equals(chunk.getSource()));