You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by ey...@apache.org on 2010/04/25 20:02:40 UTC
svn commit: r937851 - in /hadoop/chukwa/trunk: CHANGES.txt
src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java
src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestTsProcessor.java
Author: eyang
Date: Sun Apr 25 18:02:40 2010
New Revision: 937851
URL: http://svn.apache.org/viewvc?rev=937851&view=rev
Log:
CHUKWA-472. Added ability to configure timestamp format for the default Time Series Processor. (Bill Graham via Eric Yang)
Added:
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestTsProcessor.java
Modified:
hadoop/chukwa/trunk/CHANGES.txt
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java
Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=937851&r1=937850&r2=937851&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Sun Apr 25 18:02:40 2010
@@ -10,6 +10,8 @@ Trunk (unreleased changes)
CHUKWA-471. Expose JobConf to Demux Processors. (Jerome Boulon via asrabkin)
+ CHUKWA-472. Added ability to configure timestamp format for the default Time Series Processor. (Bill Graham via Eric Yang)
+
BUG FIXES
Chukwa 0.4
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java?rev=937851&r1=937850&r2=937851&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TsProcessor.java Sun Apr 25 18:02:40 2010
@@ -22,19 +22,54 @@ import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.demux.Demux;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
+/**
+ * TsProcessor is a generic processor that can be configured to find the timestamp
+ * in the text of a record. By default, this class expects that a record
+ * starts with a date in this format: <code>yyyy-MM-dd HH:mm:ss,SSS</code>
+ * <P>
+ * This format can be changed with the following configurations.
+ * <UL>
+ * <LI><code>TsProcessor.default.time.format</code> - Changes the default time
+ * format used by all data types.</LI>
+ * <LI><code>TsProcessor.time.format.[some_data_type]</code> - Overrides the default
+ * format for a specific data type.</LI>
+ * </UL>
+ * If the time string is not at the beginning of the record you can configure a
+ * regular expression to locate the timestamp text with either of the following
+ * configurations. The text found in group 1 of the regular expression match
+ * will be used with the configured date format.
+ * <UL>
+ * <LI><code>TsProcessor.default.time.regex</code> - Changes the default time
+ * location regex of the time text for all data types.</LI>
+ * <LI><code>TsProcessor.time.regex.[some_data_type]</code> - Overrides the
+ * default time location regex for a specific data type.</LI>
+ * </UL>
+ *
+ */
public class TsProcessor extends AbstractProcessor {
static Logger log = Logger.getLogger(TsProcessor.class);
- private SimpleDateFormat sdf = null;
+
+ public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS";
+
+ private Map<String, Pattern> datePatternMap;
+ private Map<String, SimpleDateFormat> dateFormatMap;
public TsProcessor() {
- // TODO move that to config
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ datePatternMap = new HashMap<String, Pattern>();
+ dateFormatMap = new HashMap<String, SimpleDateFormat>();
// StringBuilder format = new StringBuilder();
// format.append("TsProcessor.time.format");
@@ -51,7 +86,25 @@ public class TsProcessor extends Abstrac
OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
throws Throwable {
try {
- String dStr = recordEntry.substring(0, 23);
+ SimpleDateFormat sdf = fetchDateFormat(chunk.getDataType());
+ Pattern datePattern = fetchDateLocationPattern(chunk.getDataType());
+ String dStr = null;
+
+ // fetch the part of the record that contains the date.
+ if(datePattern != null) {
+ Matcher m = datePattern.matcher(recordEntry);
+ if (!m.matches() || m.groupCount() < 1) {
+ throw new ParseException("Regex " + datePattern +
+ " couldn't extract date string from record: " + recordEntry, 0);
+ }
+ else {
+ dStr = m.group(1);
+ }
+ }
+ else {
+ dStr = recordEntry.substring(0, sdf.toPattern().length());
+ }
+
Date d = sdf.parse(dStr);
ChukwaRecord record = new ChukwaRecord();
this.buildGenericRecord(record, recordEntry, d.getTime(), chunk
@@ -70,5 +123,56 @@ public class TsProcessor extends Abstrac
}
}
+
+ /**
+ * For a given dataType, returns the SimpeDateFormat to use.
+ * @param dataType
+ * @return
+ */
+ private SimpleDateFormat fetchDateFormat(String dataType) {
+ if (dateFormatMap.get(dataType) != null) {
+ return dateFormatMap.get(dataType);
+ }
+
+ JobConf jobConf = Demux.jobConf;
+ String dateFormat = DEFAULT_DATE_FORMAT;
+
+ if (jobConf != null) {
+ dateFormat = jobConf.get("TsProcessor.default.time.format", dateFormat);
+ dateFormat = jobConf.get("TsProcessor.time.format." + chunk.getDataType(),
+ dateFormat);
+ }
+
+ SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
+ dateFormatMap.put(dataType, sdf);
+
+ return sdf;
+ }
+
+ /**
+ * For a given dataType, returns a Pattern that will produce the date portion
+ * of the string.
+ * @param dataType
+ * @return
+ */
+ private Pattern fetchDateLocationPattern(String dataType) {
+ if (datePatternMap.containsKey(dataType)) {
+ return datePatternMap.get(dataType);
+ }
+
+ JobConf jobConf = Demux.jobConf;
+ String datePattern = null;
+
+ if (jobConf != null) {
+ datePattern = jobConf.get("TsProcessor.default.time.regex", null);
+ datePattern = jobConf.get("TsProcessor.time.regex." + chunk.getDataType(),
+ datePattern);
+ }
+
+ Pattern pattern = datePattern != null ? Pattern.compile(datePattern) : null;
+ datePatternMap.put(dataType, pattern);
+
+ return pattern;
+ }
}
Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestTsProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestTsProcessor.java?rev=937851&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestTsProcessor.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/TestTsProcessor.java Sun Apr 25 18:02:40 2010
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+
+import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkBuilder;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.demux.Demux;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.Map;
+import java.util.Date;
+import java.util.Calendar;
+import java.text.SimpleDateFormat;
+
+public class TestTsProcessor extends TestCase {
+
+ private static String DATA_TYPE = "testDataType";
+ private static String DATA_SOURCE = "testDataSource";
+
+ JobConf jobConf = null;
+
+ Date date = null;
+ Date dateWithoutMillis = null;
+
+ protected void setUp() throws Exception {
+ jobConf = new JobConf();
+ Demux.jobConf = jobConf;
+ date = new Date();
+
+ //if our format doesn't contain millis, then our final record date won't
+ //have them either. let's create a sample date without millis for those tests
+ //so our assertions will pass
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(date);
+ calendar.set(Calendar.MILLISECOND, 0);
+ dateWithoutMillis = calendar.getTime();
+ }
+
+ public void testDefaultFormat() {
+ String record = buildSampleSimpleRecord(date, "yyyy-MM-dd HH:mm:ss,SSS");
+ doTest(date, record);
+ }
+
+ public void testCustomDefaultFormat() {
+ jobConf.set("TsProcessor.default.time.format", "yyyy--MM--dd HH::mm::ss SSS");
+
+ String record = buildSampleSimpleRecord(date, "yyyy--MM--dd HH::mm::ss SSS");
+ doTest(date, record);
+ }
+
+ public void testCustomDataTypeFormat() {
+ jobConf.set("TsProcessor.time.format." + DATA_TYPE, "yyyy--MM--dd HH::mm::ss SSS");
+
+ String record = buildSampleSimpleRecord(date, "yyyy--MM--dd HH::mm::ss SSS");
+ doTest(date, record);
+ }
+
+ public void testCustomDefaultFormatWithCustomDataTypeFormat() {
+ jobConf.set("TsProcessor.default.time.format", "yyyy/MM/dd HH:mm:ss SSS");
+ jobConf.set("TsProcessor.time.format." + DATA_TYPE, "yyyy--MM--dd HH::mm::ss SSS");
+
+ String record = buildSampleSimpleRecord(date, "yyyy--MM--dd HH::mm::ss SSS");
+ doTest(date, record);
+ }
+
+ public void testCustomApacheDefaultFormat() {
+ jobConf.set("TsProcessor.default.time.format", "dd/MMM/yyyy:HH:mm:ss Z");
+ jobConf.set("TsProcessor.default.time.regex",
+ "^(?:[\\d.]+) \\[(\\d{2}/\\w{3}/\\d{4}:\\d{2}:\\d{2}:\\d{2} [-+]\\d{4})\\] .*");
+
+
+ String record = buildSampleApacheRecord(dateWithoutMillis, "dd/MMM/yyyy:HH:mm:ss Z");
+ doTest(dateWithoutMillis, record);
+ }
+
+ public void testCustomApacheDataTypeFormat() {
+ jobConf.set("TsProcessor.time.format." + DATA_TYPE, "dd/MMM/yyyy:HH:mm:ss Z");
+ jobConf.set("TsProcessor.time.regex." + DATA_TYPE,
+ "^(?:[\\d.]+) \\[(\\d{2}/\\w{3}/\\d{4}:\\d{2}:\\d{2}:\\d{2} [-+]\\d{4})\\] .*");
+
+
+ String record = buildSampleApacheRecord(dateWithoutMillis, "dd/MMM/yyyy:HH:mm:ss Z");
+ doTest(dateWithoutMillis, record);
+ }
+
+ private static String buildSampleSimpleRecord(Date date, String dateFormat) {
+ SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
+ return "" + sdf.format(date) + " some sample record data";
+ }
+
+ private static String buildSampleApacheRecord(Date date, String dateFormat) {
+ SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
+ return "10.10.182.49 [" + sdf.format(date) +
+ "] \"\" 200 \"-\" \"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10.5; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3\" \"some.site.com:8076\"";
+ }
+
+ public void doTest(Date date, String recordData) {
+ ChunkBuilder cb = new ChunkBuilder();
+ cb.addRecord(recordData.getBytes());
+
+ Chunk chunk = cb.getChunk();
+ chunk.setDataType(DATA_TYPE);
+ chunk.setSource(DATA_SOURCE);
+
+ ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord> output =
+ new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
+
+ TsProcessor p = new TsProcessor();
+ p.reset(chunk);
+ p.process(null, chunk, output, Reporter.NULL);
+
+ ChukwaRecordKey key = buildKey(date, DATA_SOURCE, DATA_TYPE);
+ Map<ChukwaRecordKey, ChukwaRecord> outputData = output.data;
+
+ assertNotNull("No output data found.", outputData);
+ assertEquals("Output data size not correct.", 1, outputData.size());
+
+ ChukwaRecord record = outputData.get(key);
+ assertNotNull("Output record not found.", record);
+ assertEquals("Output record time not correct.", date.getTime(), record.getTime());
+ assertEquals("Output record body not correct.", recordData,
+ new String(record.getMapFields().get("body").get()));
+ }
+
+ private static ChukwaRecordKey buildKey(Date date, String dataSource, String dataType) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(date);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+
+ ChukwaRecordKey key = new ChukwaRecordKey();
+ key.setKey("" + calendar.getTimeInMillis() + "/" + dataSource + "/" + date.getTime());
+ key.setReduceType(dataType);
+
+ return key;
+ }
+
+}
\ No newline at end of file