You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2010/04/29 19:30:42 UTC

svn commit: r939397 - in /hadoop/chukwa/trunk: CHANGES.txt src/java/org/apache/hadoop/chukwa/util/CreateRecordFile.java

Author: asrabkin
Date: Thu Apr 29 17:30:42 2010
New Revision: 939397

URL: http://svn.apache.org/viewvc?rev=939397&view=rev
Log:
Allow CreateRecordFile to read job confs. Contributed by Bill Graham.

Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/CreateRecordFile.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=939397&r1=939396&r2=939397&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Thu Apr 29 17:30:42 2010
@@ -12,6 +12,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+   CHUKWA-480.  Allow CreateRecordFile to read job confs. (Bill Graham via asrabkin)
+
     CHUKWA-471. Expose JobConf to Demux Processors. (Jerome Boulon via asrabkin)
 
     CHUKWA-472. Added ability to configure timestamp format for the default Time Series Processor.  (Bill Graham via Eric Yang)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/CreateRecordFile.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/CreateRecordFile.java?rev=939397&r1=939396&r2=939397&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/CreateRecordFile.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/CreateRecordFile.java Thu Apr 29 17:30:42 2010
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.TsProcessor;
+import org.apache.hadoop.chukwa.extraction.demux.Demux;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.ChunkImpl;
@@ -30,6 +31,8 @@ import org.apache.hadoop.chukwa.ChukwaAr
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 
 import java.io.IOException;
 import java.io.File;
@@ -79,7 +82,7 @@ public class CreateRecordFile {
        archiveKey.setStreamName(chunk.getStreamName());
        archiveKey.setSeqId(chunk.getSeqID());
 
-       processor.process(archiveKey, chunk, collector, null);
+       processor.process(archiveKey, chunk, collector, Reporter.NULL);
        seqFileWriter.append(collector.getChukwaRecordKey(),
                             collector.getChukwaRecord());
      }
@@ -119,16 +122,34 @@ public class CreateRecordFile {
      String dataType = "testDataType";
      String streamName = "testStreamName";               
      MapProcessor processor = new TsProcessor();
+     Path confFile = null;
 
      if (args.length > 2) clusterName = args[2];
      if (args.length > 3) dataType = args[3];
      if (args.length > 4) streamName = args[4];
 
      if (args.length > 5) {
-       Class clazz = Class.forName(args[5]);
+       Class clazz = null;
+       try {
+         clazz = Class.forName(args[5]);
+       }
+       catch (ClassNotFoundException e) {
+         try {
+           clazz = Class.forName(
+                 "org.apache.hadoop.chukwa.extraction.demux.processor.mapper." + args[5]);
+         }
+         catch (Exception e2) {
+           throw e;
+         }
+       }
        processor = (MapProcessor)clazz.newInstance();
      }
 
+     if (args.length > 6) {
+       confFile = new Path(args[6]);
+       Demux.jobConf = new JobConf(confFile);
+     }
+
      System.out.println("Creating sequence file using the following input:");
      System.out.println("inputFile  : " + inputFile);
      System.out.println("outputFile : " + outputFile);
@@ -136,6 +157,7 @@ public class CreateRecordFile {
      System.out.println("dataType   : " + dataType);
      System.out.println("streamName : " + streamName);
      System.out.println("processor  : " + processor.getClass().getName());
+     System.out.println("confFile   : " + confFile);
 
      makeTestSequenceFile(inputFile, outputFile, clusterName, dataType, streamName, processor);
 
@@ -143,7 +165,7 @@ public class CreateRecordFile {
    }
 
    public static void usage() {
-     System.out.println("Usage: java " + TempFileUtil.class.toString().split(" ")[1] + " <inputFile> <outputFile> [clusterName] [dataType] [streamName] [processorClass]");
+     System.out.println("Usage: java " + TempFileUtil.class.toString().split(" ")[1] + " <inputFile> <outputFile> [<clusterName> <dataType> <streamName> <processorClass> [confFile]]");
      System.out.println("Description: Takes a plain text input file and generates a Hadoop sequence file contaning ChukwaRecordKey,ChukwaRecord entries");
      System.out.println("Parameters: inputFile      - Text input file to read");
      System.out.println("            outputFile     - Sequence file to create");
@@ -151,6 +173,7 @@ public class CreateRecordFile {
      System.out.println("            dataType       - Data type to use in the records");
      System.out.println("            streamName     - Stream name to use in the records");
      System.out.println("            processorClass - Processor class to use. Defaults to TsProcessor");
+     System.out.println("            confFile       - File to use to create the JobConf");
      System.exit(0);
    }
 }