You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2012/12/20 00:44:00 UTC

svn commit: r1424228 - in /incubator/chukwa/trunk: ./ src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/ src/main/java/org/apache/hadoop/chukwa/extraction/demux/ src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/

Author: eyang
Date: Wed Dec 19 23:44:00 2012
New Revision: 1424228

URL: http://svn.apache.org/viewvc?rev=1424228&view=rev
Log:
CHUKWA-565. Added support HBaseWriter support for TsProcessor. (Bill Graham via Eric Yang)

Modified:
    incubator/chukwa/trunk/CHANGES.txt
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java

Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1424228&r1=1424227&r2=1424228&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Wed Dec 19 23:44:00 2012
@@ -30,6 +30,8 @@ Trunk (unreleased changes)
 
   BUGS
 
+    CHUKWA-565. Added support HBaseWriter support for TsProcessor. (Bill Graham via Eric Yang)
+
     CHUKWA-677. Added pid file check before starting processes. (Sreepathi Prasanna via Eric Yang)
 
     CHUKWA-676. Exclude temporarily data from release audit tools. (Eric Yang)

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java?rev=1424228&r1=1424227&r2=1424228&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java Wed Dec 19 23:44:00 2012
@@ -29,9 +29,10 @@ import org.apache.hadoop.chukwa.conf.Chu
 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
 import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
-import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.UnknownRecordTypeException;
+import org.apache.hadoop.chukwa.extraction.demux.Demux;
 import org.apache.hadoop.chukwa.util.ClassUtils;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
@@ -52,10 +53,8 @@ public class HBaseWriter extends Pipelin
   final Timer statTimer;
   private OutputCollector output;
   private Reporter reporter;
-  private ChukwaConfiguration conf = new ChukwaConfiguration();
-  String defaultProcessor = conf.get(
-      "chukwa.demux.mapper.default.processor",
-      "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
+  private ChukwaConfiguration conf;
+  String defaultProcessor;
   private HTablePool pool;
   private Configuration hconf;
   
@@ -83,27 +82,36 @@ public class HBaseWriter extends Pipelin
   }
 
   public HBaseWriter(boolean reportStats) {
-    this.reportStats = reportStats;
-    statTimer = new Timer();
-    /* HBase Version 0.20.x */
-    //hconf = new HBaseConfiguration();
-    
-    /* HBase Version 0.89.x */
-    hconf = HBaseConfiguration.create();
+    /* HBase Version >= 0.89.x */
+    this(reportStats, new ChukwaConfiguration(), HBaseConfiguration.create());
   }
 
   public HBaseWriter(ChukwaConfiguration conf, Configuration hconf) {
-    this(true);
+    this(true, conf, hconf);
+  }
+
+  private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) {
+    this.reportStats = reportStats;
     this.conf = conf;
     this.hconf = hconf;
+    this.statTimer = new Timer();
+    this.defaultProcessor = conf.get(
+      "chukwa.demux.mapper.default.processor",
+      "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
+    Demux.jobConf = conf;
+    log.info("hbase.zookeeper.quorum: " + hconf.get("hbase.zookeeper.quorum"));
   }
 
   public void close() {
-    statTimer.cancel();
+    if (reportStats) {
+      statTimer.cancel();
+    }
   }
 
   public void init(Configuration conf) throws WriterException {
-    statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
+    if (reportStats) {
+      statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
+    }
     output = new OutputCollector();
     reporter = new Reporter();
     if(conf.getBoolean("hbase.writer.verify.schema", false)) {
@@ -175,23 +183,15 @@ public class HBaseWriter extends Pipelin
     CommitStatus rv = ChukwaWriter.COMMIT_OK;
     try {
       for(Chunk chunk : chunks) {
-        String processorClass = conf.get(chunk.getDataType(),
-                defaultProcessor);
         synchronized (this) {
-          MapProcessor processor = MapProcessorFactory.getProcessor(processorClass);
           try {
-            Table table = null;
-            if(processor.getClass().isAnnotationPresent(Table.class)) {
-              table = processor.getClass().getAnnotation(Table.class);
-            } else if(processor.getClass().isAnnotationPresent(Tables.class)) {
-              Tables tables = processor.getClass().getAnnotation(Tables.class);
-              for(Table t : tables.annotations()) {
-                table = t;
-              }
-            }
+            Table table = findHBaseTable(chunk.getDataType());
+
             if(table!=null) {
-              HTableInterface hbase = pool.getTable(table.name().getBytes());  
+              HTableInterface hbase = pool.getTable(table.name().getBytes());
+              MapProcessor processor = getProcessor(chunk.getDataType());
               processor.process(new ChukwaArchiveKey(), chunk, output, reporter);
+
               hbase.put(output.getKeyValues());
               pool.putTable(hbase);
             }
@@ -214,4 +214,30 @@ public class HBaseWriter extends Pipelin
     return rv;
   }
 
+  public Table findHBaseTable(String dataType) throws UnknownRecordTypeException {
+    MapProcessor processor = getProcessor(dataType);
+
+    Table table = null;
+    if(processor.getClass().isAnnotationPresent(Table.class)) {
+      return processor.getClass().getAnnotation(Table.class);
+    } else if(processor.getClass().isAnnotationPresent(Tables.class)) {
+      Tables tables = processor.getClass().getAnnotation(Tables.class);
+      for(Table t : tables.annotations()) {
+        table = t;
+      }
+    }
+
+    return table;
+  }
+
+  public String findHBaseColumnFamilyName(String dataType)
+          throws UnknownRecordTypeException {
+    Table table = findHBaseTable(dataType);
+    return table.columnFamily();
+  }
+
+  private MapProcessor getProcessor(String dataType) throws UnknownRecordTypeException {
+    String processorClass = conf.get(dataType, defaultProcessor);
+    return MapProcessorFactory.getProcessor(processorClass);
+  }
 }

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java?rev=1424228&r1=1424227&r2=1424228&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java Wed Dec 19 23:44:00 2012
@@ -61,7 +61,7 @@ import org.apache.log4j.Logger;
 public class Demux extends Configured implements Tool {
   static Logger log = Logger.getLogger(Demux.class);
   static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
-  public static JobConf jobConf = null;
+  public static Configuration jobConf = null;
 
   public static class MapClass extends MapReduceBase implements
       Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java?rev=1424228&r1=1424227&r2=1424228&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java (original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java Wed Dec 19 23:44:00 2012
@@ -34,7 +34,7 @@ import org.apache.hadoop.chukwa.extracti
 import org.apache.hadoop.chukwa.util.RegexUtil;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 
 /**
@@ -81,10 +81,10 @@ public class TsProcessor extends Abstrac
   protected void parse(String recordEntry,
       OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
       throws Throwable {
+    String dStr = null;
     try {
       SimpleDateFormat sdf = fetchDateFormat(chunk.getDataType());
       Pattern datePattern = fetchDateLocationPattern(chunk.getDataType());
-      String dStr = null;
 
       // fetch the part of the record that contains the date.
       if(datePattern != null) {
@@ -108,7 +108,7 @@ public class TsProcessor extends Abstrac
       output.collect(key, record);
     } catch (ParseException e) {
       log.warn("Unable to parse the date in DefaultProcessor [" + recordEntry
-          + "]", e);
+          + "], date string='" + dStr + "'", e);
       e.printStackTrace();
       throw e;
     } catch (IOException e) {
@@ -130,7 +130,7 @@ public class TsProcessor extends Abstrac
       return dateFormatMap.get(dataType);
     }
 
-    JobConf jobConf = Demux.jobConf;
+    Configuration jobConf = Demux.jobConf;
     String dateFormat = DEFAULT_DATE_FORMAT;
 
     if (jobConf != null) {
@@ -139,6 +139,7 @@ public class TsProcessor extends Abstrac
                                dateFormat);
     }
 
+    log.info("dataType: " + chunk.getDataType() + ", dateFormat="+ dateFormat);
     SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
     dateFormatMap.put(dataType, sdf);
 
@@ -156,7 +157,7 @@ public class TsProcessor extends Abstrac
       return datePatternMap.get(dataType);
     }
 
-    JobConf jobConf = Demux.jobConf;
+    Configuration jobConf = Demux.jobConf;
     String datePattern = null;
     Pattern pattern = null;