You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2010/04/25 20:02:40 UTC

svn commit: r937851 - in /hadoop/chukwa/trunk: CHANGES.txt src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestTsProcessor.java

Author: eyang
Date: Sun Apr 25 18:02:40 2010
New Revision: 937851

URL: http://svn.apache.org/viewvc?rev=937851&view=rev
Log:
CHUKWA-472. Added ability to configure timestamp format for the default Time Series Processor.  (Bill Graham via Eric Yang)

Added:
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestTsProcessor.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=937851&r1=937850&r2=937851&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Sun Apr 25 18:02:40 2010
@@ -10,6 +10,8 @@ Trunk (unreleased changes)
 
     CHUKWA-471. Expose JobConf to Demux Processors. (Jerome Boulon via asrabkin)
 
+    CHUKWA-472. Added ability to configure timestamp format for the default Time Series Processor.  (Bill Graham via Eric Yang)
+
   BUG FIXES
 
 Chukwa 0.4 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java?rev=937851&r1=937850&r2=937851&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java Sun Apr 25 18:02:40 2010
@@ -22,19 +22,54 @@ import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.demux.Demux;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.log4j.Logger;
 
+/**
+ * TsProcessor is a generic processor that can be configured to find the timestamp
+ * in the text of a record. By default, this class expects that a record
+ * starts with a date in this format: <code>yyyy-MM-dd HH:mm:ss,SSS</code>
+ * <P>
+ * This format can be changed with the following configurations.
+ * <UL>
+ * <LI><code>TsProcessor.default.time.format</code> - Changes the default time
+ * format used by all data types.</LI>
+ * <LI><code>TsProcessor.time.format.[some_data_type]</code> - Overrides the default
+ * format for a specific data type.</LI>
+ * </UL>
+ * If the time string is not at the beginning of the record you can configure a
+ * regular expression to locate the timestamp text with either of the following
+ * configurations. The text found in group 1 of the regular expression match
+ * will be used with the configured date format.
+ * <UL>
+ * <LI><code>TsProcessor.default.time.regex</code> - Changes the default time
+ * location regex of the time text for all data types.</LI>
+ * <LI><code>TsProcessor.time.regex.[some_data_type]</code> - Overrides the
+ * default time location regex for a specific data type.</LI>
+ * </UL>
+ *
+ */
 public class TsProcessor extends AbstractProcessor {
   static Logger log = Logger.getLogger(TsProcessor.class);
-  private SimpleDateFormat sdf = null;
+
+  public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS";
+
+  private Map<String, Pattern> datePatternMap;
+  private Map<String, SimpleDateFormat> dateFormatMap;
 
   public TsProcessor() {
-    // TODO move that to config
-    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+    datePatternMap = new HashMap<String, Pattern>();
+    dateFormatMap = new HashMap<String, SimpleDateFormat>();
 
 //    StringBuilder format = new StringBuilder();
 //    format.append("TsProcessor.time.format");
@@ -51,7 +86,25 @@ public class TsProcessor extends Abstrac
       OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
       throws Throwable {
     try {
-      String dStr = recordEntry.substring(0, 23);
+      SimpleDateFormat sdf = fetchDateFormat(chunk.getDataType());
+      Pattern datePattern = fetchDateLocationPattern(chunk.getDataType());
+      String dStr = null;
+
+      // fetch the part of the record that contains the date.
+      if(datePattern != null) {
+        Matcher m = datePattern.matcher(recordEntry);
+        if (!m.matches() || m.groupCount() < 1) {
+          throw new ParseException("Regex " + datePattern +
+                  " couldn't extract date string from record: " + recordEntry, 0);
+        }
+        else {
+          dStr = m.group(1);
+        }
+      }
+      else {
+        dStr = recordEntry.substring(0, sdf.toPattern().length());
+      }
+
       Date d = sdf.parse(dStr);
       ChukwaRecord record = new ChukwaRecord();
       this.buildGenericRecord(record, recordEntry, d.getTime(), chunk
@@ -70,5 +123,56 @@ public class TsProcessor extends Abstrac
     }
 
   }
+  
+  /**
+   * For a given dataType, returns the SimpeDateFormat to use.
+   * @param dataType
+   * @return
+   */
+  private SimpleDateFormat fetchDateFormat(String dataType) {
+    if (dateFormatMap.get(dataType) != null) {
+      return dateFormatMap.get(dataType);
+    }
+
+    JobConf jobConf = Demux.jobConf;
+    String dateFormat = DEFAULT_DATE_FORMAT;
+
+    if (jobConf != null) {
+      dateFormat = jobConf.get("TsProcessor.default.time.format", dateFormat);
+      dateFormat = jobConf.get("TsProcessor.time.format." + chunk.getDataType(),
+                               dateFormat);
+    }
+
+    SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
+    dateFormatMap.put(dataType, sdf);
+
+    return sdf;
+  }
+
+  /**
+   * For a given dataType, returns a Pattern that will produce the date portion
+   * of the string.
+   * @param dataType
+   * @return
+   */
+  private Pattern fetchDateLocationPattern(String dataType) {
+    if (datePatternMap.containsKey(dataType)) {
+      return datePatternMap.get(dataType);
+    }
+
+    JobConf jobConf = Demux.jobConf;
+    String datePattern = null;
+
+    if (jobConf != null) {
+      datePattern = jobConf.get("TsProcessor.default.time.regex", null);
+      datePattern = jobConf.get("TsProcessor.time.regex." + chunk.getDataType(),
+                               datePattern);
+    }
+
+    Pattern pattern = datePattern != null ? Pattern.compile(datePattern) : null;
+    datePatternMap.put(dataType, pattern);
+
+    return pattern;
+  }
 
 }

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestTsProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestTsProcessor.java?rev=937851&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestTsProcessor.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestTsProcessor.java Sun Apr 25 18:02:40 2010
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+
+import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkBuilder;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.demux.Demux;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.Map;
+import java.util.Date;
+import java.util.Calendar;
+import java.text.SimpleDateFormat;
+
+public class TestTsProcessor extends TestCase {
+
+  private static String DATA_TYPE = "testDataType";
+  private static String DATA_SOURCE = "testDataSource";
+
+  JobConf jobConf = null;
+
+  Date date = null;
+  Date dateWithoutMillis = null;
+
+  protected void setUp() throws Exception {
+    jobConf = new JobConf();
+    Demux.jobConf = jobConf;
+    date = new Date();
+
+    //if our format doesn't contain millis, then our final record date won't
+    //have them either. let's create a sample date without millis for those tests
+    //so our assertions will pass
+    Calendar calendar = Calendar.getInstance();
+    calendar.setTime(date);
+    calendar.set(Calendar.MILLISECOND, 0);
+    dateWithoutMillis = calendar.getTime();
+  }
+
+  public void testDefaultFormat() {
+    String record = buildSampleSimpleRecord(date, "yyyy-MM-dd HH:mm:ss,SSS");
+    doTest(date, record);
+  }
+
+  public void testCustomDefaultFormat() {
+    jobConf.set("TsProcessor.default.time.format", "yyyy--MM--dd HH::mm::ss SSS");
+
+    String record = buildSampleSimpleRecord(date, "yyyy--MM--dd HH::mm::ss SSS");
+    doTest(date, record);
+  }
+
+  public void testCustomDataTypeFormat() {
+    jobConf.set("TsProcessor.time.format." + DATA_TYPE, "yyyy--MM--dd HH::mm::ss SSS");
+
+    String record = buildSampleSimpleRecord(date, "yyyy--MM--dd HH::mm::ss SSS");
+    doTest(date, record);
+  }
+
+  public void testCustomDefaultFormatWithCustomDataTypeFormat() {
+    jobConf.set("TsProcessor.default.time.format", "yyyy/MM/dd HH:mm:ss SSS");
+    jobConf.set("TsProcessor.time.format." + DATA_TYPE, "yyyy--MM--dd HH::mm::ss SSS");
+
+    String record = buildSampleSimpleRecord(date, "yyyy--MM--dd HH::mm::ss SSS");
+    doTest(date, record);
+  }
+
+  public void testCustomApacheDefaultFormat() {
+    jobConf.set("TsProcessor.default.time.format", "dd/MMM/yyyy:HH:mm:ss Z");
+    jobConf.set("TsProcessor.default.time.regex",
+            "^(?:[\\d.]+) \\[(\\d{2}/\\w{3}/\\d{4}:\\d{2}:\\d{2}:\\d{2} [-+]\\d{4})\\] .*");
+
+
+    String record = buildSampleApacheRecord(dateWithoutMillis, "dd/MMM/yyyy:HH:mm:ss Z");
+    doTest(dateWithoutMillis, record);
+  }
+
+  public void testCustomApacheDataTypeFormat() {
+    jobConf.set("TsProcessor.time.format." + DATA_TYPE, "dd/MMM/yyyy:HH:mm:ss Z");
+    jobConf.set("TsProcessor.time.regex." + DATA_TYPE,
+            "^(?:[\\d.]+) \\[(\\d{2}/\\w{3}/\\d{4}:\\d{2}:\\d{2}:\\d{2} [-+]\\d{4})\\] .*");
+
+
+    String record = buildSampleApacheRecord(dateWithoutMillis, "dd/MMM/yyyy:HH:mm:ss Z");
+    doTest(dateWithoutMillis, record);
+  }
+
+  private static String buildSampleSimpleRecord(Date date, String dateFormat) {
+    SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
+    return "" + sdf.format(date) + " some sample record data";
+  }
+
+  private static String buildSampleApacheRecord(Date date, String dateFormat) {
+    SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
+    return "10.10.182.49 [" + sdf.format(date) +
+            "] \"\" 200 \"-\" \"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10.5; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3\" \"some.site.com:8076\"";
+  }
+
+  public void doTest(Date date, String recordData) {
+    ChunkBuilder cb = new ChunkBuilder();
+    cb.addRecord(recordData.getBytes());
+
+    Chunk chunk = cb.getChunk();
+    chunk.setDataType(DATA_TYPE);
+    chunk.setSource(DATA_SOURCE);
+
+    ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord> output =
+            new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
+
+    TsProcessor p = new TsProcessor();
+    p.reset(chunk);
+    p.process(null, chunk, output, Reporter.NULL);
+
+    ChukwaRecordKey key = buildKey(date, DATA_SOURCE, DATA_TYPE);
+    Map<ChukwaRecordKey, ChukwaRecord> outputData = output.data;
+
+    assertNotNull("No output data found.", outputData);
+    assertEquals("Output data size not correct.", 1, outputData.size());
+
+    ChukwaRecord record = outputData.get(key);
+    assertNotNull("Output record not found.", record);
+    assertEquals("Output record time not correct.", date.getTime(), record.getTime());
+    assertEquals("Output record body not correct.", recordData,
+            new String(record.getMapFields().get("body").get()));
+  }
+
+  private static ChukwaRecordKey buildKey(Date date, String dataSource, String dataType) {
+    Calendar calendar = Calendar.getInstance();
+    calendar.setTime(date);
+    calendar.set(Calendar.MINUTE, 0);
+    calendar.set(Calendar.SECOND, 0);
+    calendar.set(Calendar.MILLISECOND, 0);
+
+    ChukwaRecordKey key = new ChukwaRecordKey();
+    key.setKey("" + calendar.getTimeInMillis() + "/" + dataSource + "/" + date.getTime());
+    key.setReduceType(dataType);
+
+    return key;
+  }
+
+}
\ No newline at end of file