You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2014/06/30 04:13:10 UTC
svn commit: r1606617 - in /chukwa/trunk: ./ bin/ conf/
src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/
src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/
src/main/java/org/apache/hadoop/chukwa/datacollection/agent...
Author: eyang
Date: Mon Jun 30 02:13:10 2014
New Revision: 1606617
URL: http://svn.apache.org/r1606617
Log:
CHUKWA-674. Integrated Chukwa collector feature to Chukwa Agent. (Eric Yang)
Added:
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
Modified:
chukwa/trunk/CHANGES.txt
chukwa/trunk/bin/chukwa
chukwa/trunk/conf/chukwa-agent-conf.xml
chukwa/trunk/conf/hbase.schema
chukwa/trunk/pom.xml
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Heatmap.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SystemMetrics.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java
chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSyslogAdaptor.java
chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java
chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java
chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestHBaseWriter.java
chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/dataloader/TestSocketDataLoader.java
chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/rest/resource/TestClientTrace.java
chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java
Modified: chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/chukwa/trunk/CHANGES.txt?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/CHANGES.txt (original)
+++ chukwa/trunk/CHANGES.txt Mon Jun 30 02:13:10 2014
@@ -12,6 +12,8 @@ Release 0.6 - Unreleased
NEW FEATURES
+ CHUKWA-674. Integrated Chukwa collector feature to Chukwa Agent. (Eric Yang)
+
CHUKWA-705. Updated Chukwa to support JDK7 and updated to Hadoop 1.2.1 and HBase 0.96.1.1. (Eric Yang)
CHUKWA-699. Updated timeline widget stylesheet. (Eric Yang)
Modified: chukwa/trunk/bin/chukwa
URL: http://svn.apache.org/viewvc/chukwa/trunk/bin/chukwa?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/bin/chukwa (original)
+++ chukwa/trunk/bin/chukwa Mon Jun 30 02:13:10 2014
@@ -161,6 +161,11 @@ elif [ "$COMMAND" = "tail" ]; then
fi
pid="${CHUKWA_PID_DIR}/$PID.pid"
+
+if [ "$1" = "start" ]; then
+ shift
+fi
+
if [ "$1" = "stop" ]; then
if [ -e $pid ]; then
TARGET_PID=`cat $pid`
@@ -169,23 +174,29 @@ if [ "$1" = "stop" ]; then
sleep $CHUKWA_STOP_TIMEOUT
if kill -0 $TARGET_PID > /dev/null 2>&1; then
kill -9 $TARGET_PID
+ exit 1
fi
fi
else
echo "Cannot find PID file - $PID.pid; NO $PID to stop";
fi
+ exit 0
elif [ -f $pid ]; then
if kill -0 `cat $pid` > /dev/null 2>&1; then
echo $command running as process `cat $pid`. Stop it first.
exit 1
+ else
+ # pid file exists, but process is dead.
+ echo $command is not runnning, but pid file existed.
+ rm -f $pid
fi
-else
- # run command
- if [ "$BACKGROUND" = "false" ]; then
- ${JAVA_HOME}/bin/java ${JAVA_OPT} -Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DCHUKWA_DATA_DIR=${CHUKWA_DATA_DIR} -DAPP=${APP} -Dlog4j.configuration=chukwa-log4j.properties -classpath ${CHUKWA_CONF_DIR}:${CLASSPATH}:${CHUKWA_CLASSPATH}:${tools} ${CLASS} $OPTS $@
- else
- exec ${JAVA_HOME}/bin/java ${JAVA_OPT} -Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DCHUKWA_DATA_DIR=${CHUKWA_DATA_DIR} -DAPP=${APP} -Dlog4j.configuration=chukwa-log4j.properties -classpath ${CHUKWA_CONF_DIR}:${CLASSPATH}:${CHUKWA_CLASSPATH}:${tools} ${CLASS} $OPTS $@ &
- sleep 1
- fi
+fi
+
+# run command
+if [ "$BACKGROUND" = "false" ]; then
+ ${JAVA_HOME}/bin/java ${JAVA_OPT} -Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DCHUKWA_DATA_DIR=${CHUKWA_DATA_DIR} -DAPP=${APP} -Dlog4j.configuration=chukwa-log4j.properties -classpath ${CHUKWA_CONF_DIR}:${CLASSPATH}:${CHUKWA_CLASSPATH}:${tools} ${CLASS} $OPTS $@
+else
+ exec ${JAVA_HOME}/bin/java ${JAVA_OPT} -Djava.library.path=${JAVA_LIBRARY_PATH} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DCHUKWA_LOG_DIR=${CHUKWA_LOG_DIR} -DCHUKWA_DATA_DIR=${CHUKWA_DATA_DIR} -DAPP=${APP} -Dlog4j.configuration=chukwa-log4j.properties -classpath ${CHUKWA_CONF_DIR}:${CLASSPATH}:${CHUKWA_CLASSPATH}:${tools} ${CLASS} $OPTS $@ &
+ sleep 1
fi
Modified: chukwa/trunk/conf/chukwa-agent-conf.xml
URL: http://svn.apache.org/viewvc/chukwa/trunk/conf/chukwa-agent-conf.xml?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/conf/chukwa-agent-conf.xml (original)
+++ chukwa/trunk/conf/chukwa-agent-conf.xml Mon Jun 30 02:13:10 2014
@@ -72,12 +72,18 @@
<property>
<name>chukwaAgent.collector.retryInterval</name>
- <value>20000</value>
+ <value>500</value>
<description>the number of milliseconds to wait between searches for a collector</description>
</property>
<property>
+ <name>chukwa.pipeline</name>
+ <value>org.apache.hadoop.chukwa.datacollection.writer.hbase.HBaseWriter</value>
+ </property>
+
+ <property>
<name>syslog.adaptor.port.9095.facility.LOCAL1</name>
<value>HADOOP</value>
</property>
+
</configuration>
Modified: chukwa/trunk/conf/hbase.schema
URL: http://svn.apache.org/viewvc/chukwa/trunk/conf/hbase.schema?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/conf/hbase.schema (original)
+++ chukwa/trunk/conf/hbase.schema Mon Jun 30 02:13:10 2014
@@ -19,25 +19,25 @@ create "Jobs",
{NAME => "summary" }
create "SystemMetrics",
{NAME => "cpu", VERSIONS => 65535},
-{NAME => "system", VERSION => 65535},
-{NAME => "disk", VERSION => 65535},
-{NAME => "memory", VERSION => 65535},
-{NAME => "swap", VERSION => 65535},
-{NAME => "network", VERSION => 65535},
-{NAME => "tags", VERSION => 65535}
+{NAME => "system", VERSIONS => 65535},
+{NAME => "disk", VERSIONS => 65535},
+{NAME => "memory", VERSIONS => 65535},
+{NAME => "swap", VERSIONS => 65535},
+{NAME => "network", VERSIONS => 65535},
+{NAME => "tags", VERSIONS => 65535}
create "ClusterSummary",
{NAME=> "cpu", VERSIONS => 65535},
-{NAME => "system", VERSION => 65535},
-{NAME => "disk", VERSION => 65535},
-{NAME => "memory", VERSION => 65535},
-{NAME => "network", VERSION => 65535},
-{NAME => "swap", VERSION => 65535},
-{NAME => "hdfs", VERSION => 65535},
-{NAME => "mapreduce", VERSION => 65535}
+{NAME => "system", VERSIONS => 65535},
+{NAME => "disk", VERSIONS => 65535},
+{NAME => "memory", VERSIONS => 65535},
+{NAME => "network", VERSIONS => 65535},
+{NAME => "swap", VERSIONS => 65535},
+{NAME => "hdfs", VERSIONS => 65535},
+{NAME => "mapreduce", VERSIONS => 65535}
create "chukwa",
{NAME=>"chukwaAgent_chunkQueue", VERSIONS => 65535},
-{NAME => "chukwaAgent_metrics", VERSION => 65535},
-{NAME => "chukwaAgent_httpSender", VERSION => 65535}
+{NAME => "chukwaAgent_metrics", VERSIONS => 65535},
+{NAME => "chukwaAgent_httpSender", VERSIONS => 65535}
create "HBase",
{NAME => "master", VERSIONS => 65535},
{NAME => "regionserver", VERSIONS => 65535}
Modified: chukwa/trunk/pom.xml
URL: http://svn.apache.org/viewvc/chukwa/trunk/pom.xml?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/pom.xml (original)
+++ chukwa/trunk/pom.xml Mon Jun 30 02:13:10 2014
@@ -28,7 +28,7 @@
<package.pid.dir>/var/run/chukwa</package.pid.dir>
<package.release>1</package.release>
<package.version>0.6.0</package.version>
- <final.name>${project.artifactId}-incubating-${package.version}</final.name>
+ <final.name>${project.artifactId}-${package.version}</final.name>
<test.build.dir>${basedir}/target/test</test.build.dir>
<test.build.data>${test.build.dir}/data</test.build.data>
<test.cache.data>${test.build.dir}/cache</test.cache.data>
@@ -118,11 +118,9 @@
<systemPath>${basedir}/lib/confspellcheck.jar</systemPath>
</dependency>
<dependency>
- <groupId>com.mdimension</groupId>
+ <groupId>com.rubiconproject.oss</groupId>
<artifactId>jchronic</artifactId>
- <version>0.2.3</version>
- <scope>system</scope>
- <systemPath>${basedir}/lib/jchronic-0.2.3.jar</systemPath>
+ <version>0.2.6</version>
</dependency>
<dependency>
<groupId>org.apache.log4j</groupId>
@@ -132,11 +130,9 @@
<systemPath>${basedir}/lib/NagiosAppender-1.5.0.jar</systemPath>
</dependency>
<dependency>
- <groupId>org.hyperic</groupId>
+ <groupId>org.fusesource</groupId>
<artifactId>sigar</artifactId>
<version>1.6.4</version>
- <scope>system</scope>
- <systemPath>${basedir}/lib/sigar.jar</systemPath>
</dependency>
<dependency>
<groupId>org.json</groupId>
@@ -271,7 +267,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>10.0.1</version>
+ <version>12.0.1</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
@@ -403,29 +399,6 @@
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.6</version>
- <configuration>
- <encoding>UTF-8</encoding>
- </configuration>
- <executions>
- <execution>
- <id>chmod-jmx-file</id>
- <phase>process-resources</phase>
- <configuration>
- <tasks name="setup">
- <chmod file="target/conf/jmxremote.password" perm="600" />
- <chmod file="target/conf/jmxremote.access" perm="600" />
- </tasks>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.1</version>
<executions>
@@ -462,6 +435,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
<executions>
<execution>
<id>default-compile</id>
@@ -470,7 +444,9 @@
<goals>
<goal>compile</goal>
</goals>
- <compilerVersion>1.7</compilerVersion>
+ <!--<compilerArgument>-Xlint:unchecked</compilerArgument>
+ <compilerArgument>-Xlint:deprecation</compilerArgument>-->
+ <compilerVersion>1.6</compilerVersion>
<source>1.6</source>
<target>1.6</target>
<excludes>
@@ -485,6 +461,8 @@
<goals>
<goal>testCompile</goal>
</goals>
+ <!--<compilerArgument>-Xlint:unchecked</compilerArgument>
+ <compilerArgument>-Xlint:deprecation</compilerArgument>-->
<compilerVersion>1.7</compilerVersion>
<source>1.6</source>
<target>1.6</target>
@@ -514,7 +492,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.10</version>
<configuration>
- <skip>true</skip>
+ <skip>false</skip>
</configuration>
<executions>
<execution>
@@ -524,7 +502,6 @@
<goal>test</goal>
</goals>
<configuration>
- <skip>false</skip>
<argLine>-Xmx1024m -Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=true -Dcom.sun.management.jmxremote.password.file=${basedir}/target/conf/jmxremote.password -Dcom.sun.management.jmxremote.access.file=${basedir}/target/conf/jmxremote.access -Dcom.sun.management.jmxremote.port=10100</argLine>
<reportsDirectory>${project.build.directory}/test-reports</reportsDirectory>
<forkMode>pertest</forkMode>
@@ -606,6 +583,7 @@
<phase>package</phase>
<warName>hicc</warName>
<warSourceDirectory>src/main/web/hicc</warSourceDirectory>
+ <webappDirectory>${project.build.directory}/hicc</webappDirectory>
</configuration>
<executions>
<execution>
@@ -645,12 +623,12 @@
<id>setup</id>
<phase>validate</phase>
<configuration>
- <tasks name="setup">
+ <target>
<mkdir dir="${basedir}/target"/>
<echo message="${VERSION}" file="${basedir}/target/VERSION"/>
<mkdir dir="${basedir}/target/clover"/>
<chmod dir="${basedir}/target/clover" perm="a+w" />
- </tasks>
+ </target>
</configuration>
<goals>
<goal>run</goal>
@@ -660,10 +638,10 @@
<id>chmod-jmx-file</id>
<phase>process-resources</phase>
<configuration>
- <tasks name="setup">
+ <target>
<chmod file="target/conf/jmxremote.password" perm="600" />
<chmod file="target/conf/jmxremote.access" perm="600" />
- </tasks>
+ </target>
</configuration>
<goals>
<goal>run</goal>
@@ -673,7 +651,7 @@
<id>test-setup</id>
<phase>generate-test-resources</phase>
<configuration>
- <tasks name="test-setup">
+ <target>
<delete dir="/tmp/chukwa/hicc" />
<mkdir dir="${test.build.dir}/var" />
<mkdir dir="${test.build.dir}/var/log" />
@@ -709,12 +687,11 @@
<expandproperties/>
</filterchain>
</copy>
- <copy file="${basedir}/src/test/resources/hbase-site.xml" tofile="${test.build.dir}/classes/hbase-site.xml"></copy>
<copy file="${basedir}/conf/log4j.properties" tofile="${test.build.dir}/conf/log4j.properties"></copy>
<copy file="${basedir}/conf/auth.conf" tofile="${test.build.dir}/conf/auth.conf"></copy>
<chmod file="${test.build.dir}/conf/jmxremote.password" perm="600" />
<chmod file="${test.build.dir}/conf/jmxremote.access" perm="600" />
- </tasks>
+ </target>
</configuration>
<goals>
<goal>run</goal>
@@ -861,7 +838,7 @@
<goal>single</goal>
</goals>
<configuration>
- <finalName>chukwa-incubating-src-${VERSION}</finalName>
+ <finalName>chukwa-src-${VERSION}</finalName>
<tarLongFileMode>gnu</tarLongFileMode>
<descriptors>
<descriptor>src/packages/tarball/src.xml</descriptor>
@@ -884,12 +861,11 @@
<id>hbase-0.96</id>
<activation>
<property>
- <name>hbase.profile</name>
- <value>0.96</value>
+ <name>!hbase.profile</name>
</property>
</activation>
<properties>
- <hbase.version>0.96.1.1-hadoop1</hbase.version>
+ <hbase.version>0.96.2-hadoop1</hbase.version>
</properties>
<dependencies>
<dependency>
@@ -982,15 +958,24 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop-compat</artifactId>
+ <classifier>tests</classifier>
+ <version>${hbase.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop1-compat</artifactId>
+ <classifier>tests</classifier>
+ <version>${hbase.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</profile>
<profile>
<id>hbase-0.94</id>
- <activation>
- <property>
- <name>!hbase.profile</name>
- </property>
- </activation>
<properties>
<hbase.version>0.94.9</hbase.version>
</properties>
@@ -1082,21 +1067,10 @@
<repositories>
<repository>
- <id>maven2-repository.dev.java.net</id>
- <name>Java.net Repository for Maven</name>
- <url>http://download.java.net/maven/2/</url>
- <layout>default</layout>
- </repository>
- <repository>
<id>codehaus</id>
<url>http://repository.codehaus.org/</url>
</repository>
<repository>
- <id>Sonatype-public</id>
- <name>SnakeYAML repository</name>
- <url>http://oss.sonatype.org/content/groups/public/</url>
- </repository>
- <repository>
<id>clojars</id>
<url>http://clojars.org/repo/</url>
</repository>
@@ -1137,7 +1111,6 @@
<artifactId>findbugs-maven-plugin</artifactId>
<version>2.3.3</version>
<configuration>
-<!-- <onlyAnalyze>org.apache.hadoop.chukwa.datacollection.*</onlyAnalyze> -->
<threshold>Normal</threshold>
<effort>Max</effort>
</configuration>
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Heatmap.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Heatmap.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Heatmap.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Heatmap.java Mon Jun 30 02:13:10 2014
@@ -18,16 +18,11 @@
package org.apache.hadoop.chukwa.analysis.salsa.visualization;
-import prefuse.data.io.sql.*;
-import prefuse.data.expression.parser.*;
-import prefuse.data.expression.*;
-import prefuse.data.column.*;
-import prefuse.data.query.*;
+
import prefuse.data.*;
import prefuse.action.*;
import prefuse.action.layout.*;
import prefuse.action.assignment.*;
-import prefuse.visual.expression.*;
import prefuse.visual.*;
import prefuse.render.*;
import prefuse.util.*;
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/analysis/salsa/visualization/Swimlanes.java Mon Jun 30 02:13:10 2014
@@ -383,7 +383,7 @@ public class Swimlanes {
VisualItem item = null;
SwimlanesStatePalette pal = new SwimlanesStatePalette();
- Iterator curr_group_items = this.m_vis.items(this.m_group);
+ Iterator<?> curr_group_items = this.m_vis.items(this.m_group);
while (curr_group_items.hasNext()) {
item = (VisualItem) curr_group_items.next();
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SystemMetrics.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SystemMetrics.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SystemMetrics.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/sigar/SystemMetrics.java Mon Jun 30 02:13:10 2014
@@ -41,7 +41,7 @@ import org.apache.log4j.Logger;
public class SystemMetrics extends AbstractAdaptor {
static Logger log = Logger.getLogger(SystemMetrics.class);
- private long period = 60 * 1000;
+ private long period = 5 * 1000;
private SigarRunner runner;
private Timer timer;
@@ -50,14 +50,17 @@ public class SystemMetrics extends Abstr
int spOffset = args.indexOf(' ');
if (spOffset > 0) {
try {
- period = Integer.parseInt(args.substring(0, spOffset));
+ period = Long.parseLong(args.substring(0, spOffset));
period = period * 1000;
+ start(spOffset);
} catch (NumberFormatException e) {
StringBuilder buffer = new StringBuilder();
buffer.append("SystemMetrics: sample interval ");
buffer.append(args.substring(0, spOffset));
buffer.append(" can't be parsed.");
log.warn(buffer.toString());
+ } catch (AdaptorException e) {
+ log.warn("Error parsing parameter for SystemMetrics adaptor.");
}
}
return args;
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Mon Jun 30 02:13:10 2014
@@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHa
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.datacollection.DataFactory;
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
@@ -45,6 +46,7 @@ import org.apache.hadoop.chukwa.datacoll
import org.apache.hadoop.chukwa.datacollection.agent.metrics.AgentMetrics;
import org.apache.hadoop.chukwa.datacollection.connector.Connector;
import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
+import org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector;
import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
import org.apache.hadoop.chukwa.util.DaemonWatcher;
@@ -52,13 +54,14 @@ import org.apache.hadoop.chukwa.util.Exc
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
-
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.jetty.nio.SelectChannelConnector;
import org.mortbay.thread.BoundedThreadPool;
+
import com.sun.jersey.spi.container.servlet.ServletContainer;
+
import edu.berkeley.confspell.*;
/**
@@ -79,21 +82,129 @@ public class ChukwaAgent implements Adap
// boolean WRITE_CHECKPOINTS = true;
static AgentMetrics agentMetrics = new AgentMetrics("ChukwaAgent", "metrics");
+ private static Logger log = Logger.getLogger(ChukwaAgent.class);
private static final int HTTP_SERVER_THREADS = 120;
private static Server jettyServer = null;
private OffsetStatsManager adaptorStatsManager = null;
private Timer statsCollector = null;
+ private static volatile Configuration conf = null;
+ private static volatile ChukwaAgent agent = null;
+ public Connector connector = null;
+
+ protected ChukwaAgent() throws AlreadyRunningException {
+ this(new ChukwaConfiguration());
+ }
+
+ public ChukwaAgent(Configuration conf) throws AlreadyRunningException {
+ ChukwaAgent.agent = this;
+ this.conf = conf;
+
+ // almost always just reading this; so use a ConcurrentHM.
+ // since we wrapped the offset, it's not a structural mod.
+ adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
+ adaptorsByName = new HashMap<String, Adaptor>();
+ checkpointNumber = 0;
+
+ boolean DO_CHECKPOINT_RESTORE = conf.getBoolean(
+ "chukwaAgent.checkpoint.enabled", true);
+ CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
+ "chukwa_checkpoint_");
+ final int CHECKPOINT_INTERVAL_MS = conf.getInt(
+ "chukwaAgent.checkpoint.interval", 5000);
+ final int STATS_INTERVAL_MS = conf.getInt(
+ "chukwaAgent.stats.collection.interval", 10000);
+ final int STATS_DATA_TTL_MS = conf.getInt(
+ "chukwaAgent.stats.data.ttl", 1200000);
+
+ if (conf.get("chukwaAgent.checkpoint.dir") != null)
+ checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
+ else
+ DO_CHECKPOINT_RESTORE = false;
+
+ if (checkpointDir != null && !checkpointDir.exists()) {
+ checkpointDir.mkdirs();
+ }
+ tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
+ DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown_cluster\""));
+
+ log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
+ log.info("Config - checkpointDir: [" + checkpointDir + "]");
+ log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
+ + "]");
+ log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
+ log.info("Config - STATS_INTERVAL_MS: [" + STATS_INTERVAL_MS + "]");
+ log.info("Config - tags: [" + tags + "]");
+
+ if (DO_CHECKPOINT_RESTORE) {
+ log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
+ }
+
+ File initialAdaptors = null;
+ if (conf.get("chukwaAgent.initial_adaptors") != null)
+ initialAdaptors = new File(conf.get("chukwaAgent.initial_adaptors"));
+
+ try {
+ if (DO_CHECKPOINT_RESTORE) {
+ restoreFromCheckpoint();
+ }
+ } catch (IOException e) {
+ log.warn("failed to restart from checkpoint: ", e);
+ }
+
+ try {
+ if (initialAdaptors != null && initialAdaptors.exists())
+ readAdaptorsFile(initialAdaptors);
+ } catch (IOException e) {
+ log.warn("couldn't read user-specified file "
+ + initialAdaptors.getAbsolutePath());
+ }
+
+ controlSock = new AgentControlSocketListener(this);
+ try {
+ controlSock.tryToBind(); // do this synchronously; if it fails, we know
+ // another agent is running.
+ controlSock.start(); // this sets us up as a daemon
+ log.info("control socket started on port " + controlSock.portno);
+
+ // start the HTTP server with stats collection
+ try {
+ this.adaptorStatsManager = new OffsetStatsManager(STATS_DATA_TTL_MS);
+ this.statsCollector = new Timer("ChukwaAgent Stats Collector");
+
+ startHttpServer(conf);
+
+ statsCollector.scheduleAtFixedRate(new StatsCollectorTask(),
+ STATS_INTERVAL_MS, STATS_INTERVAL_MS);
+ } catch (Exception e) {
+ log.error("Couldn't start HTTP server", e);
+ throw new RuntimeException(e);
+ }
+
+ // shouldn't start checkpointing until we're finishing launching
+ // adaptors on boot
+ if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir != null) {
+ checkpointer = new Timer();
+ checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
+ }
+ } catch (IOException e) {
+ log.info("failed to bind to socket; aborting agent launch", e);
+ throw new AlreadyRunningException();
+ }
- static Logger log = Logger.getLogger(ChukwaAgent.class);
- static ChukwaAgent agent = null;
+ }
public static ChukwaAgent getAgent() {
+ if(agent == null) {
+ try {
+ agent = new ChukwaAgent();
+ } catch(AlreadyRunningException e) {
+ log.error("Chukwa Agent is already running", e);
+ agent = null;
+ }
+ }
return agent;
}
- static Configuration conf = null;
- Connector connector = null;
-
// doesn't need an equals(), comparator, etc
public static class Offset {
public Offset(long l, String id) {
@@ -177,21 +288,24 @@ public class ChukwaAgent implements Adap
int uriArgNumber = 0;
if (args.length > 0) {
- if (args[uriArgNumber].equals("local"))
+ if (args[uriArgNumber].equals("local")) {
agent.connector = new ConsoleOutConnector(agent);
- else {
- if (!args[uriArgNumber].contains("://"))
+ } else {
+ if (!args[uriArgNumber].contains("://")) {
args[uriArgNumber] = "http://" + args[uriArgNumber];
+ }
agent.connector = new HttpConnector(agent, args[uriArgNumber]);
}
- } else
- agent.connector = new HttpConnector(agent);
-
+ } else {
+ String connectorType = conf.get("chukwa.agent.connector",
+ "org.apache.hadoop.chukwa.datacollection.connector.PipelineConnector");
+ agent.connector = (Connector) Class.forName(connectorType).newInstance();
+ }
agent.connector.start();
log.info("local agent started on port " + agent.getControlSock().portno);
- System.out.close();
- System.err.close();
+ //System.out.close();
+ //System.err.close();
} catch (AlreadyRunningException e) {
log.error("agent started already on this machine with same portno;"
+ " bailing out");
@@ -219,108 +333,6 @@ public class ChukwaAgent implements Adap
}
}
- public ChukwaAgent() throws AlreadyRunningException {
- this(new Configuration());
- }
-
- public ChukwaAgent(Configuration conf) throws AlreadyRunningException {
- ChukwaAgent.agent = this;
- this.conf = conf;
-
- // almost always just reading this; so use a ConcurrentHM.
- // since we wrapped the offset, it's not a structural mod.
- adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
- adaptorsByName = new HashMap<String, Adaptor>();
- checkpointNumber = 0;
-
- boolean DO_CHECKPOINT_RESTORE = conf.getBoolean(
- "chukwaAgent.checkpoint.enabled", true);
- CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
- "chukwa_checkpoint_");
- final int CHECKPOINT_INTERVAL_MS = conf.getInt(
- "chukwaAgent.checkpoint.interval", 5000);
- final int STATS_INTERVAL_MS = conf.getInt(
- "chukwaAgent.stats.collection.interval", 10000);
- final int STATS_DATA_TTL_MS = conf.getInt(
- "chukwaAgent.stats.data.ttl", 1200000);
-
- if (conf.get("chukwaAgent.checkpoint.dir") != null)
- checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
- else
- DO_CHECKPOINT_RESTORE = false;
-
- if (checkpointDir != null && !checkpointDir.exists()) {
- checkpointDir.mkdirs();
- }
- tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
- DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown_cluster\""));
-
- log.info("Config - CHECKPOINT_BASE_NAME: [" + CHECKPOINT_BASE_NAME + "]");
- log.info("Config - checkpointDir: [" + checkpointDir + "]");
- log.info("Config - CHECKPOINT_INTERVAL_MS: [" + CHECKPOINT_INTERVAL_MS
- + "]");
- log.info("Config - DO_CHECKPOINT_RESTORE: [" + DO_CHECKPOINT_RESTORE + "]");
- log.info("Config - STATS_INTERVAL_MS: [" + STATS_INTERVAL_MS + "]");
- log.info("Config - tags: [" + tags + "]");
-
- if (DO_CHECKPOINT_RESTORE) {
- log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
- }
-
- File initialAdaptors = null;
- if (conf.get("chukwaAgent.initial_adaptors") != null)
- initialAdaptors = new File(conf.get("chukwaAgent.initial_adaptors"));
-
- try {
- if (DO_CHECKPOINT_RESTORE) {
- restoreFromCheckpoint();
- }
- } catch (IOException e) {
- log.warn("failed to restart from checkpoint: ", e);
- }
-
- try {
- if (initialAdaptors != null && initialAdaptors.exists())
- readAdaptorsFile(initialAdaptors);
- } catch (IOException e) {
- log.warn("couldn't read user-specified file "
- + initialAdaptors.getAbsolutePath());
- }
-
- controlSock = new AgentControlSocketListener(this);
- try {
- controlSock.tryToBind(); // do this synchronously; if it fails, we know
- // another agent is running.
- controlSock.start(); // this sets us up as a daemon
- log.info("control socket started on port " + controlSock.portno);
-
- // start the HTTP server with stats collection
- try {
- this.adaptorStatsManager = new OffsetStatsManager(STATS_DATA_TTL_MS);
- this.statsCollector = new Timer("ChukwaAgent Stats Collector");
-
- startHttpServer(conf);
-
- statsCollector.scheduleAtFixedRate(new StatsCollectorTask(),
- STATS_INTERVAL_MS, STATS_INTERVAL_MS);
- } catch (Exception e) {
- log.error("Couldn't start HTTP server", e);
- throw new RuntimeException(e);
- }
-
- // shouldn't start checkpointing until we're finishing launching
- // adaptors on boot
- if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir != null) {
- checkpointer = new Timer();
- checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
- }
- } catch (IOException e) {
- log.info("failed to bind to socket; aborting agent launch", e);
- throw new AlreadyRunningException();
- }
-
- }
-
private void startHttpServer(Configuration conf) throws Exception {
int portNum = conf.getInt("chukwaAgent.http.port", 9090);
String jaxRsAddlPackages = conf.get("chukwaAgent.http.rest.controller.packages");
@@ -716,7 +728,7 @@ public class ChukwaAgent implements Adap
return o;
}
- Connector getConnector() {
+ public Connector getConnector() {
return connector;
}
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java Mon Jun 30 02:13:10 2014
@@ -36,6 +36,7 @@ import javax.servlet.http.HttpServlet;
import java.io.File;
import java.util.*;
+@Deprecated
public class CollectorStub {
static int THREADS = 120;
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java Mon Jun 30 02:13:10 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.chukwa.extracti
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
+@Deprecated
public class CommitCheckServlet extends HttpServlet {
private static final long serialVersionUID = -4627538252371890849L;
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java Mon Jun 30 02:13:10 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.chukwa.*;
import org.apache.hadoop.chukwa.datacollection.writer.ExtractorWriter;
import org.apache.hadoop.conf.Configuration;
+@Deprecated
public class LogDisplayServlet extends HttpServlet {
/*
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java Mon Jun 30 02:13:10 2014
@@ -46,6 +46,7 @@ import org.apache.hadoop.io.compress.Com
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
+@Deprecated
public class ServletCollector extends HttpServlet {
static final boolean FANCY_DIAGNOSTICS = false;
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java Mon Jun 30 02:13:10 2014
@@ -26,6 +26,7 @@ import java.util.*;
/**
* One per post
*/
+@Deprecated
public class ServletDiagnostics {
static Logger log = Logger.getLogger(ServletDiagnostics.class);
Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java?rev=1606617&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java (added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java Mon Jun 30 02:13:10 2014
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.connector;
+
+/**
+ * This class is responsible for setting up connections with configured
+ * storage writers base on configuration of chukwa_agent.xml.
+ *
+ * On error, tries the list of available storage writers, pauses for a minute,
+ * and then repeats.
+ *
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.Connector;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
+import org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
+import org.apache.hadoop.chukwa.util.DaemonWatcher;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+public class PipelineConnector implements Connector, Runnable {
+
+ static Logger log = Logger.getLogger(PipelineConnector.class);
+
+ Timer statTimer = null;
+ volatile int chunkCount = 0;
+
+ int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
+ int MIN_POST_INTERVAL = 5 * 1000;
+ public static final String MIN_POST_INTERVAL_OPT = "pipelineConnector.minPostInterval";
+ public static final String MAX_SIZE_PER_POST_OPT = "pipelineConnector.maxPostSize";
+ public static final String ASYNC_ACKS_OPT = "pipelineConnector.asyncAcks";
+
+ ChunkQueue chunkQueue;
+
+ private static volatile ChukwaAgent agent = null;
+
+ private volatile boolean stopMe = false;
+ protected ChukwaWriter writers = null;
+
+ public PipelineConnector() {
+ //instance initializer block
+ statTimer = new Timer();
+ statTimer.schedule(new TimerTask() {
+ public void run() {
+ int count = chunkCount;
+ chunkCount = 0;
+ log.info("# Data chunks sent since last report: " + count);
+ }
+ }, 100, 60 * 1000);
+ }
+
+ public void start() {
+ chunkQueue = DataFactory.getInstance().getEventQueue();
+ agent = ChukwaAgent.getAgent();
+ Configuration conf = agent.getConfiguration();
+ MAX_SIZE_PER_POST = conf.getInt(MAX_SIZE_PER_POST_OPT, MAX_SIZE_PER_POST);
+ MIN_POST_INTERVAL = conf.getInt(MIN_POST_INTERVAL_OPT, MIN_POST_INTERVAL);
+ try {
+ writers = new PipelineStageWriter(conf);
+ (new Thread(this, "Pipeline connector thread")).start();
+ } catch(Exception e) {
+ log.error("Pipeline initialization error: ", e);
+ }
+ }
+
+ public void shutdown() {
+ stopMe = true;
+ try {
+ writers.close();
+ } catch (WriterException e) {
+ log.warn("Shutdown error: ",e);
+ }
+ }
+
+ public void run() {
+ log.info("PipelineConnector started at time:" + System.currentTimeMillis());
+
+ try {
+ long lastPost = System.currentTimeMillis();
+ while (!stopMe) {
+ List<Chunk> newQueue = new ArrayList<Chunk>();
+ try {
+ // get all ready chunks from the chunkQueue to be sent
+ chunkQueue.collect(newQueue, MAX_SIZE_PER_POST);
+ } catch (InterruptedException e) {
+ log.warn("thread interrupted during addChunks(ChunkQueue)");
+ Thread.currentThread().interrupt();
+ break;
+ }
+ CommitStatus result = writers.add(newQueue);
+ if(result.equals(ChukwaWriter.COMMIT_OK)) {
+ chunkCount = newQueue.size();
+ for (Chunk c : newQueue) {
+ agent.reportCommit(c.getInitiator(), c.getSeqID());
+ }
+ }
+ long now = System.currentTimeMillis();
+ long delta = MIN_POST_INTERVAL - now + lastPost;
+ if(delta > 0) {
+ Thread.sleep(delta); // wait for stuff to accumulate
+ }
+ lastPost = now;
+ } // end of try forever loop
+ log.info("received stop() command so exiting run() loop to shutdown connector");
+ } catch (WriterException e) {
+ log.warn("PipelineStageWriter Exception: ", e);
+ } catch (OutOfMemoryError e) {
+ log.warn("Bailing out", e);
+ DaemonWatcher.bailout(-1);
+ } catch (InterruptedException e) {
+ // do nothing, let thread die.
+ log.warn("Bailing out", e);
+ DaemonWatcher.bailout(-1);
+ } catch (Throwable e) {
+ log.error("connector failed; shutting down agent: ", e);
+ throw new RuntimeException("Shutdown pipeline connector.");
+ }
+ }
+
+ @Override
+ public void reloadConfiguration() {
+ }
+
+}
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java Mon Jun 30 02:13:10 2014
@@ -81,7 +81,7 @@ public class FileTailerStressTest {
server.setStopAtShutdown(false);
Thread.sleep(1000);
- ChukwaAgent agent = new ChukwaAgent();
+ ChukwaAgent agent = ChukwaAgent.getAgent();
HttpConnector connector = new HttpConnector(agent,
"http://localhost:9990/chukwa");
connector.start();
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java Mon Jun 30 02:13:10 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.chukwa.datacol
import java.util.List;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.collector.servlet.LogDisplayServlet;
-import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
import org.apache.hadoop.conf.Configuration;
public class ExtractorWriter extends PipelineableWriter {
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java Mon Jun 30 02:13:10 2014
@@ -20,7 +20,9 @@ package org.apache.hadoop.chukwa.datacol
import java.util.List;
+
import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
@@ -35,6 +37,15 @@ public class PipelineStageWriter impleme
ChukwaWriter writer; // head of pipeline
+ public PipelineStageWriter() throws WriterException {
+ Configuration conf = new ChukwaConfiguration();
+ init(conf);
+ }
+
+ public PipelineStageWriter(Configuration conf) throws WriterException {
+ init(conf);
+ }
+
@Override
public CommitStatus add(List<Chunk> chunks) throws WriterException {
return writer.add(chunks);
@@ -47,8 +58,8 @@ public class PipelineStageWriter impleme
@Override
public void init(Configuration conf) throws WriterException {
- if (conf.get("chukwaCollector.pipeline") != null) {
- String pipeline = conf.get("chukwaCollector.pipeline");
+ if (conf.get("chukwa.pipeline") != null) {
+ String pipeline = conf.get("chukwa.pipeline");
try {
String[] classes = pipeline.split(",");
log.info("using pipelined writers, pipe length is " + classes.length);
@@ -65,7 +76,7 @@ public class PipelineStageWriter impleme
Object st = stageClass.newInstance();
if (!(st instanceof PipelineableWriter))
log.error("class " + classes[i]
- + " in processing pipeline isn't a pipeline stage");
+ + " in processing pipeline isn't a PipelineableWriter.");
PipelineableWriter stage = (PipelineableWriter) stageClass
.newInstance();
@@ -98,7 +109,7 @@ public class PipelineStageWriter impleme
throw new WriterException("bad pipeline");
}
} else {
- throw new WriterException("must set chukwaCollector.pipeline");
+ throw new WriterException("must set chukwa.pipeline");
}
}
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java Mon Jun 30 02:13:10 2014
@@ -20,10 +20,8 @@ package org.apache.hadoop.chukwa.datacol
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.regex.PatternSyntaxException;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.util.Filter;
-import org.apache.hadoop.chukwa.util.RegexUtil;
import org.apache.hadoop.chukwa.util.RegexUtil.CheckedPatternSyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
@@ -80,8 +78,8 @@ public class SocketTeeWriter extends Pip
class SocketListenThread extends Thread {
ServerSocket s;
public SocketListenThread(Configuration conf) throws IOException {
- int portno = conf.getInt("chukwaCollector.tee.port", DEFAULT_PORT);
- USE_KEEPALIVE = conf.getBoolean("chukwaCollector.tee.keepalive", true);
+ int portno = conf.getInt("chukwa.tee.port", DEFAULT_PORT);
+ USE_KEEPALIVE = conf.getBoolean("chukwa.tee.keepalive", true);
s = new ServerSocket(portno);
setDaemon(true);
}
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java Mon Jun 30 02:13:10 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.datacollection.writer.hbase;
+import java.io.IOException;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
@@ -38,8 +39,12 @@ import org.apache.hadoop.chukwa.util.Dae
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
@@ -55,7 +60,7 @@ public class HBaseWriter extends Pipelin
private Reporter reporter;
private ChukwaConfiguration conf;
String defaultProcessor;
- private HTablePool pool;
+ private HConnection connection;
private Configuration hconf;
private class StatReportingTask extends TimerTask {
@@ -77,20 +82,20 @@ public class HBaseWriter extends Pipelin
}
};
- public HBaseWriter() {
+ public HBaseWriter() throws IOException {
this(true);
}
- public HBaseWriter(boolean reportStats) {
+ public HBaseWriter(boolean reportStats) throws IOException {
/* HBase Version >= 0.89.x */
this(reportStats, new ChukwaConfiguration(), HBaseConfiguration.create());
}
- public HBaseWriter(ChukwaConfiguration conf, Configuration hconf) {
+ public HBaseWriter(ChukwaConfiguration conf, Configuration hconf) throws IOException {
this(true, conf, hconf);
}
- private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) {
+ private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) throws IOException {
this.reportStats = reportStats;
this.conf = conf;
this.hconf = hconf;
@@ -99,7 +104,16 @@ public class HBaseWriter extends Pipelin
"chukwa.demux.mapper.default.processor",
"org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
Demux.jobConf = conf;
- log.info("hbase.zookeeper.quorum: " + hconf.get("hbase.zookeeper.quorum"));
+ log.info("hbase.zookeeper.quorum: " + hconf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + hconf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
+ if (reportStats) {
+ statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
+ }
+ output = new OutputCollector();
+ reporter = new Reporter();
+ if(conf.getBoolean("hbase.writer.verify.schema", false)) {
+ verifyHbaseSchema();
+ }
+ connection = HConnectionManager.createConnection(hconf);
}
public void close() {
@@ -109,15 +123,6 @@ public class HBaseWriter extends Pipelin
}
public void init(Configuration conf) throws WriterException {
- if (reportStats) {
- statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
- }
- output = new OutputCollector();
- reporter = new Reporter();
- if(conf.getBoolean("hbase.writer.verify.schema", false)) {
- verifyHbaseSchema();
- }
- pool = new HTablePool(hconf, 60);
}
private boolean verifyHbaseTable(HBaseAdmin admin, Table table) {
@@ -188,12 +193,12 @@ public class HBaseWriter extends Pipelin
Table table = findHBaseTable(chunk.getDataType());
if(table!=null) {
- HTableInterface hbase = pool.getTable(table.name().getBytes());
+ HTableInterface hbase = connection.getTable(table.name());
MapProcessor processor = getProcessor(chunk.getDataType());
processor.process(new ChukwaArchiveKey(), chunk, output, reporter);
-
hbase.put(output.getKeyValues());
- pool.putTable(hbase);
+ } else {
+ log.warn("Error finding HBase table for data type:"+chunk.getDataType());
}
} catch (Exception e) {
log.warn(output.getKeyValues());
Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/TestOffsetStatsManager.java Mon Jun 30 02:13:10 2014
@@ -43,8 +43,8 @@ public class TestOffsetStatsManager exte
// calculate 5 second average
double rate = statsManager.calcAverageRate(dummyKey, 5);
- assertTrue("Invalid average, expected about 1 byte/sec, found " + rate,
- Math.abs(1000 - rate) < 1.5);
+ assertTrue("Invalid average, expected about 1 kbyte/sec, found " + rate,
+ Math.abs(rate / 1000) <= 1);
}
public void testCalcAverageRateStaleData() throws InterruptedException {
Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSyslogAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSyslogAdaptor.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSyslogAdaptor.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/TestSyslogAdaptor.java Mon Jun 30 02:13:10 2014
@@ -42,7 +42,7 @@ public class TestSyslogAdaptor extends T
DatagramPacket p = new DatagramPacket(buf, buf.length);
p.setSocketAddress(new InetSocketAddress("127.0.0.1",u.portno));
send.send(p);
-
+ send.close();
synchronized(this) {
wait(1000);
}
Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java Mon Jun 30 02:13:10 2014
@@ -40,7 +40,7 @@ public class TestLogRotate extends TestC
public void testLogRotate() throws IOException, InterruptedException,
ChukwaAgent.AlreadyRunningException {
- ChukwaAgent agent = new ChukwaAgent();
+ ChukwaAgent agent = ChukwaAgent.getAgent();
// Remove any adaptor left over from previous run
ChukwaConfiguration cc = new ChukwaConfiguration();
int portno = cc.getInt("chukwaAgent.control.port", 9093);
Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java Mon Jun 30 02:13:10 2014
@@ -21,33 +21,62 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.Map;
+
import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
import org.apache.hadoop.conf.Configuration;
+
import junit.framework.TestCase;
+
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
import org.apache.log4j.Level;
+import org.mortbay.log.Log;
public class TestRCheckAdaptor extends TestCase implements ChunkReceiver {
ChunkCatcherConnector chunks;
+ ChukwaAgent agent;
public TestRCheckAdaptor() {
chunks = new ChunkCatcherConnector();
chunks.start();
}
+ @Override
+ protected void setUp() throws InterruptedException {
+ Configuration conf = new ChukwaConfiguration();
+ conf.set("", "org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector");
+ try {
+ agent = new ChukwaAgent(conf);
+ Thread.sleep(2000);
+ Map<String, String> adaptorList = agent.getAdaptorList();
+ for(String id : adaptorList.keySet()) {
+ agent.stopAdaptor(id, false);
+ }
+ } catch (AlreadyRunningException e) {
+ fail("Agent is already running.");
+ }
+ }
+
+ @Override
+ protected void tearDown() throws InterruptedException {
+ agent.shutdown();
+ Thread.sleep(2000);
+ }
+
public void testBaseCases() throws IOException, InterruptedException,
ChukwaAgent.AlreadyRunningException {
Configuration conf = new Configuration();
conf.set("chukwaAgent.control.port", "0");
conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
-// RCheckFTAdaptor.log.setLevel(Level.DEBUG);
File baseDir = new File(System.getProperty("test.build.data", "/tmp") + "/rcheck");
TestDirTailingAdaptor.createEmptyDir(baseDir);
File tmpOutput = new File(baseDir, "rotateTest.1");
@@ -60,16 +89,14 @@ public class TestRCheckAdaptor extends T
pw.println("Second");
pw.close();
-
- ChukwaAgent agent = new ChukwaAgent(conf);
String adaptorID = agent.processAddCommand("add lr = filetailer.RCheckFTAdaptor test " + tmpOutput.getAbsolutePath() + " 0");
assertNotNull(adaptorID);
- Chunk c = chunks.waitForAChunk(2000);
+ Chunk c = chunks.waitForAChunk();
assertNotNull(c);
assertTrue(c.getData().length == 6);
assertTrue("First\n".equals(new String(c.getData())));
- c = chunks.waitForAChunk(2000);
+ c = chunks.waitForAChunk();
assertNotNull(c);
assertTrue(c.getData().length == 7);
assertTrue("Second\n".equals(new String(c.getData())));
@@ -77,7 +104,7 @@ public class TestRCheckAdaptor extends T
pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
pw.println("Third");
pw.close();
- c = chunks.waitForAChunk(2000);
+ c = chunks.waitForAChunk();
assertNotNull(c);
assertTrue(c.getData().length == 6);
@@ -88,7 +115,7 @@ public class TestRCheckAdaptor extends T
pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
pw.println("Fourth");
pw.close();
- c = chunks.waitForAChunk(2000);
+ c = chunks.waitForAChunk();
assertNotNull(c);
System.out.println("got " + new String(c.getData()));
@@ -101,22 +128,19 @@ public class TestRCheckAdaptor extends T
pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
pw.println("Fifth");
pw.close();
- c = chunks.waitForAChunk(2000);
+ c = chunks.waitForAChunk();
assertNotNull(c);
System.out.println("got " + new String(c.getData()));
assertTrue("Fifth\n".equals(new String(c.getData())));
- agent.shutdown();
- Thread.sleep(2000);
}
-
public void testContinuously() throws Exception {
File baseDir = new File(System.getProperty("test.build.data", "/tmp") + "/rcheck");
TestDirTailingAdaptor.createEmptyDir(baseDir);
File tmpOutput = new File(baseDir, "continuousTest");
PrintWriter pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
- LWFTAdaptor.tailer.SAMPLE_PERIOD_MS = 2000;
+ //LWFTAdaptor.tailer.SAMPLE_PERIOD_MS = 2000;
// RCheckFTAdaptor.log.setLevel(Level.DEBUG);
RCheckFTAdaptor rca = new RCheckFTAdaptor();
Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/rest/TestAdaptorController.java Mon Jun 30 02:13:10 2014
@@ -49,7 +49,7 @@ public class TestAdaptorController exten
StringBuilder sb;
protected void setUp() throws Exception {
- agent = new ChukwaAgent();
+ agent = ChukwaAgent.getAgent();
ServletHolder servletHolder = new ServletHolder(ServletContainer.class);
servletHolder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/connector/TestFailedCollector.java Mon Jun 30 02:13:10 2014
@@ -31,7 +31,7 @@ public class TestFailedCollector extends
public void testFailedCollector() {
try {
- ChukwaAgent agent = new ChukwaAgent();
+ ChukwaAgent agent = ChukwaAgent.getAgent();
boolean failed = false;
HttpConnector connector = new HttpConnector(agent,
"http://localhost:1234/chukwa");
Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestHBaseWriter.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestHBaseWriter.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestHBaseWriter.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestHBaseWriter.java Mon Jun 30 02:13:10 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.datacollection.writer;
+import java.io.IOException;
import java.util.ArrayList;
import junit.framework.Assert;
@@ -28,11 +29,11 @@ import org.apache.hadoop.chukwa.ChunkImp
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.datacollection.writer.hbase.HBaseWriter;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -52,36 +53,33 @@ public class TestHBaseWriter extends Tes
private byte[] table = Bytes.toBytes("Test");
private byte[] test = Bytes.toBytes("1234567890 Key Value");
private ChukwaConfiguration cc;
+ private LocalHBaseCluster cluster;
long timestamp = 1234567890;
public TestHBaseWriter() {
cc = new ChukwaConfiguration();
-
- conf = HBaseConfiguration.create();
- conf.set("hbase.hregion.memstore.flush.size", String.valueOf(128*1024));
+ }
+
+ public void setUp() {
try {
- util = new HBaseTestingUtility(conf);
+ util = new HBaseTestingUtility();
util.startMiniZKCluster();
- util.getConfiguration().setBoolean("dfs.support.append", true);
- util.startMiniCluster(2);
- HTableDescriptor desc = new HTableDescriptor();
+ util.startMiniCluster();
+ conf = util.getConfiguration();
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf("Test"));
HColumnDescriptor family = new HColumnDescriptor(columnFamily);
- desc.setName(table);
desc.addFamily(family);
util.getHBaseAdmin().createTable(desc);
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
- }
- }
-
- public void setup() {
-
+ }
}
- public void tearDown() {
-
+ public void tearDown() throws Exception {
+ util.shutdownMiniCluster();
+ util.shutdownMiniZKCluster();
}
public void testWriters() {
@@ -90,7 +88,6 @@ public class TestHBaseWriter extends Tes
try {
cc.set("hbase.demux.package", "org.apache.chukwa.datacollection.writer.test.demux");
cc.set("TextParser","org.apache.hadoop.chukwa.datacollection.writer.test.demux.TextParser");
- conf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1");
hbw = new HBaseWriter(cc, conf);
hbw.init(cc);
if(hbw.add(chunks)!=ChukwaWriter.COMMIT_OK) {
@@ -103,8 +100,8 @@ public class TestHBaseWriter extends Tes
}
// Cleanup and return
scanner.close();
+ testTable.close();
// Compare data in Hbase with generated chunks
- util.shutdownMiniCluster();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java Mon Jun 30 02:13:10 2014
@@ -32,11 +32,11 @@ public class TestSocketTee extends Test
Configuration conf = new Configuration();
- conf.set("chukwaCollector.pipeline",
+ conf.set("chukwa.pipeline",
SocketTeeWriter.class.getCanonicalName()+","// note comma
+ CaptureWriter.class.getCanonicalName());
- conf.set("chukwaCollector.writerClass",
+ conf.set("chukwa.writerClass",
PipelineStageWriter.class.getCanonicalName());
PipelineStageWriter psw = new PipelineStageWriter();
Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/dataloader/TestSocketDataLoader.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/dataloader/TestSocketDataLoader.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/dataloader/TestSocketDataLoader.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/dataloader/TestSocketDataLoader.java Mon Jun 30 02:13:10 2014
@@ -39,10 +39,10 @@ public class TestSocketDataLoader exten
Configuration conf = new Configuration();
- conf.set("chukwaCollector.pipeline",
+ conf.set("chukwa.pipeline",
SocketTeeWriter.class.getCanonicalName());
- conf.set("chukwaCollector.writerClass",
+ conf.set("chukwa.writerClass",
PipelineStageWriter.class.getCanonicalName());
PipelineStageWriter psw = new PipelineStageWriter();
Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/rest/resource/TestClientTrace.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/rest/resource/TestClientTrace.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/rest/resource/TestClientTrace.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/rest/resource/TestClientTrace.java Mon Jun 30 02:13:10 2014
@@ -42,8 +42,8 @@ public class TestClientTrace extends Set
SocketTeeWriter.class.getCanonicalName());
conf.set("chukwaCollector.writerClass",
PipelineStageWriter.class.getCanonicalName());
- PipelineStageWriter psw = new PipelineStageWriter();
try {
+ PipelineStageWriter psw = new PipelineStageWriter();
psw.init(conf);
// Send a client trace chunk
ArrayList<Chunk> l = new ArrayList<Chunk>();
Modified: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java?rev=1606617&r1=1606616&r2=1606617&view=diff
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java (original)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/tools/backfilling/TestBackfillingLoader.java Mon Jun 30 02:13:10 2014
@@ -40,6 +40,8 @@ import org.apache.hadoop.io.SequenceFile
public class TestBackfillingLoader extends TestCase{
+ private String cluster = "chukwa";
+
public void testBackfillingLoaderWithCharFileTailingAdaptorUTF8NewLineEscaped() {
String tmpDir = System.getProperty("test.build.data", "/tmp");
long ts = System.currentTimeMillis();
@@ -50,7 +52,7 @@ public class TestBackfillingLoader exten
conf.set("chukwaCollector.outputDir", dataDir + "/log/");
conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
- String cluster = "MyCluster_" + ts;
+ String cluster = "chukwa";
String machine = "machine_" + ts;
String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
String recordType = "MyRecordType_" + ts;
@@ -110,7 +112,6 @@ public class TestBackfillingLoader exten
conf.set("chukwaCollector.outputDir", dataDir + "/log/");
conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
- String cluster = "MyCluster_" + ts;
String machine = "machine_" + ts;
String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor";
String recordType = "MyRecordType_" + ts;
@@ -173,7 +174,6 @@ public class TestBackfillingLoader exten
conf.set("chukwaCollector.rotateInterval", "" + (Integer.MAX_VALUE -1));
- String cluster = "MyCluster_" + ts;
String machine = "machine_" + ts;
String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
String recordType = "MyRecordType_" + ts;
@@ -237,7 +237,6 @@ public class TestBackfillingLoader exten
conf.set("chukwaCollector.writerClass", "org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter");
conf.set("chukwaCollector.minPercentFreeDisk", "2");//so unit tests pass on machines with full-ish disks
- String cluster = "MyCluster_" + ts;
String machine = "machine_" + ts;
String adaptorName = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
String recordType = "MyRecordType_" + ts;
@@ -303,6 +302,9 @@ public class TestBackfillingLoader exten
while (reader.next(key, chunk)) {
+ System.out.println("cluster:" + cluster);
+ System.out.println("cluster:" + RecordUtil.getClusterName(chunk));
+
Assert.assertTrue(cluster.equals(RecordUtil.getClusterName(chunk)));
Assert.assertTrue(dataType.equals(chunk.getDataType()));
Assert.assertTrue(source.equals(chunk.getSource()));