You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2013/08/18 04:46:52 UTC
svn commit: r1515060 - in /incubator/chukwa/trunk: ./ conf/
src/main/java/org/apache/hadoop/chukwa/extraction/demux/
src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/
src/test/java/org/apache/hadoop/chukwa/extraction/demux/
Author: eyang
Date: Sun Aug 18 02:46:51 2013
New Revision: 1515060
URL: http://svn.apache.org/r1515060
Log:
CHUKWA-581. Added support for custom reducer package name. (Ivy Tang via Eric Yang)
Modified:
incubator/chukwa/trunk/CHANGES.txt
incubator/chukwa/trunk/conf/chukwa-demux-conf.xml
incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java
incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java
incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java
Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1515060&r1=1515059&r2=1515060&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Sun Aug 18 02:46:51 2013
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
NEW FEATURES
+ CHUKWA-581. Added support for custom reducer package name. (Ivy Tang via Eric Yang)
+
CHUKWA-669. JMX Adaptor. (shreyas subramanya via asrabkin)
CHUKWA-671. Json processors for processing JMX data from Hadoop, HBase and Zookeeper. (shreyas subramanya via asrabkin)
Modified: incubator/chukwa/trunk/conf/chukwa-demux-conf.xml
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/chukwa-demux-conf.xml?rev=1515060&r1=1515059&r2=1515060&view=diff
==============================================================================
--- incubator/chukwa/trunk/conf/chukwa-demux-conf.xml (original)
+++ incubator/chukwa/trunk/conf/chukwa-demux-conf.xml Sun Aug 18 02:46:51 2013
@@ -105,7 +105,7 @@
<!-- -->
-<!-- Demux configs -->
+<!-- Demux configs for mapper-->
<property>
<name>demux.max.error.count.before.shutdown</name>
@@ -128,49 +128,49 @@
<property>
<name>SysLog</name>
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.SysLog</value>
- <description>Parser class for </description>
+ <description>Parser class for SysLog</description>
</property>
<property>
<name>Df</name>
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Df</value>
- <description>Parser class for </description>
+ <description>Parser class for Df</description>
</property>
<property>
<name>HadoopLog</name>
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.HadoopLogProcessor</value>
- <description>Parser class for </description>
+ <description>Parser class for HadoopLog</description>
</property>
<property>
<name>Iostat</name>
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Iostat</value>
- <description>Parser class for </description>
+ <description>Parser class for Iostat</description>
</property>
<property>
<name>PbsNodes</name>
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.PbsNodes</value>
- <description>Parser class for </description>
+ <description>Parser class for PbsNodes</description>
</property>
<property>
<name>Sar</name>
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Sar</value>
- <description>Parser class for </description>
+ <description>Parser class for Sar</description>
</property>
<property>
<name>TsProcessor</name>
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.TsProcessor</value>
- <description>Parser class for </description>
+ <description>Parser class for TsProcessor</description>
</property>
<property>
<name>Top</name>
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Top</value>
- <description>Parser class for </description>
+ <description>Parser class for Top</description>
</property>
<property>
@@ -182,7 +182,7 @@
<property>
<name>DbLoader</name>
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.TsProcessor</value>
- <description>Parser class for </description>
+ <description>Parser class for DbLoader</description>
</property>
<property>
@@ -255,5 +255,29 @@
<name>HBaseRegionServerProcessor</name>
<value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.HBaseRegionServerProcessor</value>
</property>
+
+ <!-- Demux configs for reducer-->
+ <property>
+ <name>JobLogHistoryReduceProcessor</name>
+ <value>,org.apache.hadoop.chukwa.extraction.demux.processor.reducer.JobLogHistoryReduceProcessor</value>
+ <description> Reducer class for Reduce Type JobLogHistoryReduceProcessor </description>
+ </property>
+
+ <property>
+ <name>SystemMetrics</name>
+ <value>,org.apache.hadoop.chukwa.extraction.demux.processor.reducer.SystemMetrics</value>
+ <description> Reducer class for Reduce Type SystemMetrics </description>
+ </property>
+
+ <property>
+ <name>MRJobReduceProcessor</name>
+ <value>,org.apache.hadoop.chukwa.extraction.demux.processor.reducer.MRJobReduceProcessor</value>
+ <description> Reducer class for Reduce Type MRJobReduceProcessor </description>
+ </property>
+
+ <property>
+ <name>ClientTrace</name>
+ <value>,org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ClientTrace</value>
+ <description> Reducer class for Reduce Type ClientTrace </description>
+ </property>
</configuration>
-
Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java?rev=1515060&r1=1515059&r2=1515060&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java Sun Aug 18 02:46:51 2013
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
+
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
@@ -64,53 +65,54 @@ public class Demux extends Configured im
public static Configuration jobConf = null;
public static class MapClass extends MapReduceBase implements
- Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
+ Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
@Override
public void configure(JobConf jobConf) {
super.configure(jobConf);
- Demux.jobConf= jobConf;
+ Demux.jobConf = jobConf;
}
public void map(ChukwaArchiveKey key, ChunkImpl chunk,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
- throws IOException {
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws IOException {
ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
- "DemuxMapOutput", output, reporter);
+ "DemuxMapOutput", output, reporter);
try {
long duration = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("Entry: [" + chunk.getData() + "] EventType: ["
- + chunk.getDataType() + "]");
+ + chunk.getDataType() + "]");
}
String defaultProcessor = Demux.jobConf.get(
- "chukwa.demux.mapper.default.processor",
- "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
+ "chukwa.demux.mapper.default.processor",
+ "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
- String processorClass = Demux.jobConf.get(chunk.getDataType(),
+ String processorClass_pri = Demux.jobConf.get(chunk.getDataType(),
defaultProcessor);
+ String processorClass = processorClass_pri.split(",")[0];
if (!processorClass.equalsIgnoreCase("Drop")) {
reporter.incrCounter("DemuxMapInput", "total chunks", 1);
reporter.incrCounter("DemuxMapInput",
- chunk.getDataType() + " chunks", 1);
+ chunk.getDataType() + " chunks", 1);
MapProcessor processor = MapProcessorFactory
- .getProcessor(processorClass);
+ .getProcessor(processorClass);
processor.process(key, chunk, chukwaOutputCollector, reporter);
if (log.isDebugEnabled()) {
duration = System.currentTimeMillis() - duration;
log.debug("Demux:Map dataType:" + chunk.getDataType()
- + " duration:" + duration + " processor:" + processorClass
- + " recordCount:" + chunk.getRecordOffsets().length);
+ + " duration:" + duration + " processor:" + processorClass
+ + " recordCount:" + chunk.getRecordOffsets().length);
}
} else {
log.info("action:Demux, dataType:" + chunk.getDataType()
- + " duration:0 processor:Drop recordCount:"
- + chunk.getRecordOffsets().length);
+ + " duration:0 processor:Drop recordCount:"
+ + chunk.getRecordOffsets().length);
}
} catch (Exception e) {
@@ -121,7 +123,7 @@ public class Demux extends Configured im
}
public static class ReduceClass extends MapReduceBase implements
- Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
+ Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
public void configure(JobConf jobConf) {
super.configure(jobConf);
@@ -129,27 +131,37 @@ public class Demux extends Configured im
}
public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
- throws IOException {
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws IOException {
ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
- "DemuxReduceOutput", output, reporter);
+ "DemuxReduceOutput", output, reporter);
try {
long duration = System.currentTimeMillis();
reporter.incrCounter("DemuxReduceInput", "total distinct keys", 1);
reporter.incrCounter("DemuxReduceInput", key.getReduceType()
- + " total distinct keys", 1);
+ + " total distinct keys", 1);
- String defaultProcessor = Demux.jobConf.get(
- "chukwa.demux.reducer.default.processor", null);
- ReduceProcessor processor = ReduceProcessorFactory.getProcessor(
- key.getReduceType(), defaultProcessor);
+ String defaultProcessor_classname = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer" +
+ ".IdentityReducer";
+ String defaultProcessor = Demux.jobConf.get("chukwa.demux.reducer.default.processor",
+ "," + defaultProcessor_classname);
+
+ String processClass_pri = Demux.jobConf.get(key.getReduceType(), defaultProcessor);
+ String[] processClass_tmps = processClass_pri.split(",");
+ String processClass = null;
+ if (processClass_tmps.length != 2)
+ processClass = defaultProcessor_classname;
+ else
+ processClass = processClass_tmps[1];
+ ReduceProcessor processor = ReduceProcessorFactory.getProcessor(processClass);
+ System.out.println(processor.getClass().getName());
processor.process(key, values, chukwaOutputCollector, reporter);
if (log.isDebugEnabled()) {
duration = System.currentTimeMillis() - duration;
log.debug("Demux:Reduce, dataType:" + key.getReduceType()
- + " duration:" + duration);
+ + " duration:" + duration);
}
} catch (Exception e) {
@@ -166,14 +178,14 @@ public class Demux extends Configured im
}
public static void addParsers(Configuration conf) {
- String parserPath = conf.get("chukwa.data.dir")+File.separator+"demux";
+ String parserPath = conf.get("chukwa.data.dir") + File.separator + "demux";
try {
FileSystem fs = FileSystem.get(new Configuration());
FileStatus[] fstatus = fs.listStatus(new Path(parserPath));
- if(fstatus!=null) {
+ if (fstatus != null) {
String hdfsUrlPrefix = conf.get("fs.default.name");
- for(FileStatus parser : fstatus) {
+ for (FileStatus parser : fstatus) {
Path jarPath = new Path(parser.getPath().toString().replace(hdfsUrlPrefix, ""));
log.debug("Adding parser JAR path " + jarPath);
DistributedCache.addFileToClassPath(jarPath, conf);
@@ -183,10 +195,10 @@ public class Demux extends Configured im
log.error(ExceptionUtil.getStackTrace(e));
}
}
-
+
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(new ChukwaConfiguration(), Demux.class);
-
+
conf.setJobName("Chukwa-Demux_" + day.format(new Date()));
conf.setInputFormat(SequenceFileInputFormat.class);
@@ -199,7 +211,7 @@ public class Demux extends Configured im
conf.setOutputFormat(ChukwaRecordOutputFormat.class);
conf.setJobPriority(JobPriority.VERY_HIGH);
addParsers(conf);
-
+
List<String> other_args = new ArrayList<String>();
for (int i = 0; i < args.length; ++i) {
try {
@@ -215,14 +227,14 @@ public class Demux extends Configured im
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from "
- + args[i - 1]);
+ + args[i - 1]);
return printUsage();
}
}
// Make sure there are exactly 2 parameters left.
if (other_args.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: "
- + other_args.size() + " instead of 2.");
+ + other_args.size() + " instead of 2.");
return printUsage();
}
Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java?rev=1515060&r1=1515059&r2=1515060&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java Sun Aug 18 02:46:51 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.chukwa.extract
import java.util.HashMap;
+
import org.apache.log4j.Logger;
public class ReduceProcessorFactory {
@@ -41,52 +42,39 @@ public class ReduceProcessorFactory {
private ReduceProcessorFactory() {
}
- public static ReduceProcessor getProcessor(String reduceType, String defaultProcessor)
- throws UnknownReduceTypeException {
- String path = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer."
- + reduceType;
+ /**
+ * Register a specific parser for a {@link ReduceProcessor} implementation.
+ */
+ public static synchronized void register(String reduceType,
+ ReduceProcessor processor) {
+ log.info("register " + processor.getClass().getName()
+ + " for this recordType :" + reduceType);
if (processors.containsKey(reduceType)) {
- return processors.get(reduceType);
+ throw new DuplicateReduceProcessorException(
+ "Duplicate processor for recordType:" + reduceType);
+ }
+ ReduceProcessorFactory.processors.put(reduceType, processor);
+ }
+
+ public static ReduceProcessor getProcessor(String processorClass) throws UnknownReduceTypeException {
+ if (processors.containsKey(processorClass)) {
+ return processors.get(processorClass);
} else {
ReduceProcessor processor = null;
try {
- processor = (ReduceProcessor) Class.forName(path).getConstructor()
- .newInstance();
+ processor = (ReduceProcessor) Class.forName(processorClass).newInstance();
} catch (ClassNotFoundException e) {
- // ******** WARNING ********
- // If the ReduceProcessor is not there see if there is a configured
- // default processor. If not, fall back on the Identity instead
- if(defaultProcessor != null) {
- processor = getProcessor(defaultProcessor, null);
- }
- else {
- processor = getProcessor("IdentityReducer", null);
- }
- register(reduceType, processor);
- return processor;
+ throw new UnknownReduceTypeException("Unknown reducer class for:" + processorClass, e);
} catch (Exception e) {
throw new UnknownReduceTypeException("error constructing processor", e);
}
-
// TODO using a ThreadSafe/reuse flag to actually decide if we want
// to reuse the same processor again and again
- register(reduceType, processor);
+ register(processorClass, processor);
return processor;
}
- }
- /**
- * Register a specific parser for a {@link ReduceProcessor} implementation.
- */
- public static synchronized void register(String reduceType,
- ReduceProcessor processor) {
- log.info("register " + processor.getClass().getName()
- + " for this recordType :" + reduceType);
- if (processors.containsKey(reduceType)) {
- throw new DuplicateReduceProcessorException(
- "Duplicate processor for recordType:" + reduceType);
- }
- ReduceProcessorFactory.processors.put(reduceType, processor);
}
+
}
Modified: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java?rev=1515060&r1=1515059&r2=1515060&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java (original)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java Sun Aug 18 02:46:51 2013
@@ -44,7 +44,7 @@ public class TestDemuxMapperConfigs exte
JobConf conf = new JobConf();
conf.set("chukwa.demux.mapper.default.processor",
- "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MockMapProcessor");
+ "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MockMapProcessor,");
mapper.configure(conf);
ChunkBuilder cb = new ChunkBuilder();
@@ -60,4 +60,31 @@ public class TestDemuxMapperConfigs exte
assertEquals("MockMapProcessor never invoked - no records found", 1, output.data.size());
assertNotNull("MockMapProcessor never invoked", output.data.get(recordKey));
}
+
+ public void testSetCustomeMapProcessor() throws IOException {
+ Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> mapper =
+ new Demux.MapClass();
+ String custom_DataType = "cus_dt";
+
+ JobConf conf = new JobConf();
+ conf.set(custom_DataType,
+ "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MockMapProcessor,");
+ mapper.configure(conf);
+
+ ChunkBuilder cb = new ChunkBuilder();
+ cb.addRecord(SAMPLE_RECORD_DATA.getBytes());
+ ChunkImpl chunk = (ChunkImpl)cb.getChunk();
+ chunk.setDataType(custom_DataType);
+
+ ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord> output =
+ new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
+
+ mapper.map(new ChukwaArchiveKey(), chunk, output, Reporter.NULL);
+ ChukwaRecordKey recordKey = new ChukwaRecordKey("someReduceType", SAMPLE_RECORD_DATA);
+
+ assertEquals("MockMapProcessor never invoked - no records found", 1, output.data.size());
+ assertNotNull("MockMapProcessor never invoked", output.data.get(recordKey));
+ }
+
+
}
Modified: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java?rev=1515060&r1=1515059&r2=1515060&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java (original)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java Sun Aug 18 02:46:51 2013
@@ -39,7 +39,8 @@ public class TestDemuxReducerConfigs ext
new Demux.ReduceClass();
JobConf conf = new JobConf();
- conf.set("chukwa.demux.reducer.default.processor", "MockReduceProcessor");
+ conf.set("chukwa.demux.reducer.default.processor", ",org.apache.hadoop.chukwa.extraction.demux.processor.reducer" +
+ ".MockReduceProcessor");
reducer.configure(conf);
ChukwaRecordKey key = new ChukwaRecordKey("someReduceType", "someKey");
@@ -54,4 +55,27 @@ public class TestDemuxReducerConfigs ext
"MockReduceProcessorValue",
output.data.get(key).getValue("MockReduceProcessorKey"));
}
+
+ public void testSetCustomReducerProcessor() throws IOException {
+ Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> reducer =
+ new Demux.ReduceClass();
+
+ JobConf conf = new JobConf();
+ String cus_reduceType = "someReduceType";
+ conf.set(cus_reduceType, ",org.apache.hadoop.chukwa.extraction.demux.processor.reducer" +
+ ".MockReduceProcessor");
+ reducer.configure(conf);
+
+ ChukwaRecordKey key = new ChukwaRecordKey(cus_reduceType, "someKey");
+ ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord> output =
+ new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
+
+ reducer.reduce(key, null, output, Reporter.NULL);
+
+ assertEquals("MockReduceProcessor never invoked - no records found", 1, output.data.size());
+ assertNotNull("MockReduceProcessor never invoked", output.data.get(key));
+ assertEquals("MockReduceProcessor never invoked - key value incorrect",
+ "MockReduceProcessorValue",
+ output.data.get(key).getValue("MockReduceProcessorKey"));
+ }
}
\ No newline at end of file