You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2012/12/20 00:44:00 UTC
svn commit: r1424228 - in /incubator/chukwa/trunk: ./
src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/
src/main/java/org/apache/hadoop/chukwa/extraction/demux/
src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/
Author: eyang
Date: Wed Dec 19 23:44:00 2012
New Revision: 1424228
URL: http://svn.apache.org/viewvc?rev=1424228&view=rev
Log:
CHUKWA-565. Added support HBaseWriter support for TsProcessor. (Bill Graham via Eric Yang)
Modified:
incubator/chukwa/trunk/CHANGES.txt
incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java
Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1424228&r1=1424227&r2=1424228&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Wed Dec 19 23:44:00 2012
@@ -30,6 +30,8 @@ Trunk (unreleased changes)
BUGS
+ CHUKWA-565. Added support HBaseWriter support for TsProcessor. (Bill Graham via Eric Yang)
+
CHUKWA-677. Added pid file check before starting processes. (Sreepathi Prasanna via Eric Yang)
CHUKWA-676. Exclude temporarily data from release audit tools. (Eric Yang)
Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java?rev=1424228&r1=1424227&r2=1424228&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java Wed Dec 19 23:44:00 2012
@@ -29,9 +29,10 @@ import org.apache.hadoop.chukwa.conf.Chu
import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
-import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.UnknownRecordTypeException;
+import org.apache.hadoop.chukwa.extraction.demux.Demux;
import org.apache.hadoop.chukwa.util.ClassUtils;
import org.apache.hadoop.chukwa.util.DaemonWatcher;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
@@ -52,10 +53,8 @@ public class HBaseWriter extends Pipelin
final Timer statTimer;
private OutputCollector output;
private Reporter reporter;
- private ChukwaConfiguration conf = new ChukwaConfiguration();
- String defaultProcessor = conf.get(
- "chukwa.demux.mapper.default.processor",
- "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
+ private ChukwaConfiguration conf;
+ String defaultProcessor;
private HTablePool pool;
private Configuration hconf;
@@ -83,27 +82,36 @@ public class HBaseWriter extends Pipelin
}
public HBaseWriter(boolean reportStats) {
- this.reportStats = reportStats;
- statTimer = new Timer();
- /* HBase Version 0.20.x */
- //hconf = new HBaseConfiguration();
-
- /* HBase Version 0.89.x */
- hconf = HBaseConfiguration.create();
+ /* HBase Version >= 0.89.x */
+ this(reportStats, new ChukwaConfiguration(), HBaseConfiguration.create());
}
public HBaseWriter(ChukwaConfiguration conf, Configuration hconf) {
- this(true);
+ this(true, conf, hconf);
+ }
+
+ private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) {
+ this.reportStats = reportStats;
this.conf = conf;
this.hconf = hconf;
+ this.statTimer = new Timer();
+ this.defaultProcessor = conf.get(
+ "chukwa.demux.mapper.default.processor",
+ "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
+ Demux.jobConf = conf;
+ log.info("hbase.zookeeper.quorum: " + hconf.get("hbase.zookeeper.quorum"));
}
public void close() {
- statTimer.cancel();
+ if (reportStats) {
+ statTimer.cancel();
+ }
}
public void init(Configuration conf) throws WriterException {
- statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
+ if (reportStats) {
+ statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
+ }
output = new OutputCollector();
reporter = new Reporter();
if(conf.getBoolean("hbase.writer.verify.schema", false)) {
@@ -175,23 +183,15 @@ public class HBaseWriter extends Pipelin
CommitStatus rv = ChukwaWriter.COMMIT_OK;
try {
for(Chunk chunk : chunks) {
- String processorClass = conf.get(chunk.getDataType(),
- defaultProcessor);
synchronized (this) {
- MapProcessor processor = MapProcessorFactory.getProcessor(processorClass);
try {
- Table table = null;
- if(processor.getClass().isAnnotationPresent(Table.class)) {
- table = processor.getClass().getAnnotation(Table.class);
- } else if(processor.getClass().isAnnotationPresent(Tables.class)) {
- Tables tables = processor.getClass().getAnnotation(Tables.class);
- for(Table t : tables.annotations()) {
- table = t;
- }
- }
+ Table table = findHBaseTable(chunk.getDataType());
+
if(table!=null) {
- HTableInterface hbase = pool.getTable(table.name().getBytes());
+ HTableInterface hbase = pool.getTable(table.name().getBytes());
+ MapProcessor processor = getProcessor(chunk.getDataType());
processor.process(new ChukwaArchiveKey(), chunk, output, reporter);
+
hbase.put(output.getKeyValues());
pool.putTable(hbase);
}
@@ -214,4 +214,30 @@ public class HBaseWriter extends Pipelin
return rv;
}
+ public Table findHBaseTable(String dataType) throws UnknownRecordTypeException {
+ MapProcessor processor = getProcessor(dataType);
+
+ Table table = null;
+ if(processor.getClass().isAnnotationPresent(Table.class)) {
+ return processor.getClass().getAnnotation(Table.class);
+ } else if(processor.getClass().isAnnotationPresent(Tables.class)) {
+ Tables tables = processor.getClass().getAnnotation(Tables.class);
+ for(Table t : tables.annotations()) {
+ table = t;
+ }
+ }
+
+ return table;
+ }
+
+ public String findHBaseColumnFamilyName(String dataType)
+ throws UnknownRecordTypeException {
+ Table table = findHBaseTable(dataType);
+ return table.columnFamily();
+ }
+
+ private MapProcessor getProcessor(String dataType) throws UnknownRecordTypeException {
+ String processorClass = conf.get(dataType, defaultProcessor);
+ return MapProcessorFactory.getProcessor(processorClass);
+ }
}
Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java?rev=1424228&r1=1424227&r2=1424228&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java Wed Dec 19 23:44:00 2012
@@ -61,7 +61,7 @@ import org.apache.log4j.Logger;
public class Demux extends Configured implements Tool {
static Logger log = Logger.getLogger(Demux.class);
static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
- public static JobConf jobConf = null;
+ public static Configuration jobConf = null;
public static class MapClass extends MapReduceBase implements
Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java?rev=1424228&r1=1424227&r2=1424228&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java Wed Dec 19 23:44:00 2012
@@ -34,7 +34,7 @@ import org.apache.hadoop.chukwa.extracti
import org.apache.hadoop.chukwa.util.RegexUtil;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
/**
@@ -81,10 +81,10 @@ public class TsProcessor extends Abstrac
protected void parse(String recordEntry,
OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
throws Throwable {
+ String dStr = null;
try {
SimpleDateFormat sdf = fetchDateFormat(chunk.getDataType());
Pattern datePattern = fetchDateLocationPattern(chunk.getDataType());
- String dStr = null;
// fetch the part of the record that contains the date.
if(datePattern != null) {
@@ -108,7 +108,7 @@ public class TsProcessor extends Abstrac
output.collect(key, record);
} catch (ParseException e) {
log.warn("Unable to parse the date in DefaultProcessor [" + recordEntry
- + "]", e);
+ + "], date string='" + dStr + "'", e);
e.printStackTrace();
throw e;
} catch (IOException e) {
@@ -130,7 +130,7 @@ public class TsProcessor extends Abstrac
return dateFormatMap.get(dataType);
}
- JobConf jobConf = Demux.jobConf;
+ Configuration jobConf = Demux.jobConf;
String dateFormat = DEFAULT_DATE_FORMAT;
if (jobConf != null) {
@@ -139,6 +139,7 @@ public class TsProcessor extends Abstrac
dateFormat);
}
+ log.info("dataType: " + chunk.getDataType() + ", dateFormat="+ dateFormat);
SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
dateFormatMap.put(dataType, sdf);
@@ -156,7 +157,7 @@ public class TsProcessor extends Abstrac
return datePatternMap.get(dataType);
}
- JobConf jobConf = Demux.jobConf;
+ Configuration jobConf = Demux.jobConf;
String datePattern = null;
Pattern pattern = null;