You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2010/04/29 21:09:22 UTC
svn commit: r939464 - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/extraction/demux/
src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/
src/test/org/apache/hadoop/chukwa/extraction/demux/
src/test/org/apache/hadoop/c...
Author: asrabkin
Date: Thu Apr 29 19:09:21 2010
New Revision: 939464
URL: http://svn.apache.org/viewvc?rev=939464&view=rev
Log:
CHUKWA-473. Make default processor configurable. Contributed by Bill Graham
Added:
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MockMapProcessor.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MockReduceProcessor.java
Modified:
hadoop/chukwa/trunk/CHANGES.txt
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java
Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=939464&r1=939463&r2=939464&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Thu Apr 29 19:09:21 2010
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
NEW FEATURES
+ CHUKWA-473. Make default processor configurable. (Bill Graham via asrabkin)
+
CHUKWA-479. Support HTTP trigger actions (Bill Graham via asrabkin)
CHUKWA-469. Add JMSAdaptor. (Bill Graham via asrabkin)
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java?rev=939464&r1=939463&r2=939464&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java Thu Apr 29 19:09:21 2010
@@ -33,6 +33,7 @@ import org.apache.hadoop.chukwa.extracti
import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessorFactory;
+import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessor;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
@@ -64,12 +65,10 @@ public class Demux extends Configured im
public static class MapClass extends MapReduceBase implements
Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
- JobConf jobConf = null;
@Override
public void configure(JobConf jobConf) {
super.configure(jobConf);
- this.jobConf = jobConf;
Demux.jobConf= jobConf;
}
@@ -85,9 +84,13 @@ public class Demux extends Configured im
log.debug("Entry: [" + chunk.getData() + "] EventType: ["
+ chunk.getDataType() + "]");
}
- String processorClass = jobConf
- .get(chunk.getDataType(),
- "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
+
+ String defaultProcessor = Demux.jobConf.get(
+ "chukwa.demux.mapper.default.processor",
+ "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
+
+ String processorClass = Demux.jobConf.get(chunk.getDataType(),
+ defaultProcessor);
if (!processorClass.equalsIgnoreCase("Drop")) {
reporter.incrCounter("DemuxMapInput", "total chunks", 1);
@@ -119,12 +122,12 @@ public class Demux extends Configured im
public static class ReduceClass extends MapReduceBase implements
Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
-
+
public void configure(JobConf jobConf) {
super.configure(jobConf);
Demux.jobConf = jobConf;
}
-
+
public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
throws IOException {
@@ -136,8 +139,12 @@ public class Demux extends Configured im
reporter.incrCounter("DemuxReduceInput", key.getReduceType()
+ " total distinct keys", 1);
- ReduceProcessorFactory.getProcessor(key.getReduceType()).process(key,
- values, chukwaOutputCollector, reporter);
+ String defaultProcessor = Demux.jobConf.get(
+ "chukwa.demux.reducer.default.processor", null);
+ ReduceProcessor processor = ReduceProcessorFactory.getProcessor(
+ key.getReduceType(), defaultProcessor);
+
+ processor.process(key, values, chukwaOutputCollector, reporter);
if (log.isDebugEnabled()) {
duration = System.currentTimeMillis() - duration;
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java?rev=939464&r1=939463&r2=939464&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java Thu Apr 29 19:09:21 2010
@@ -41,7 +41,7 @@ public class ReduceProcessorFactory {
private ReduceProcessorFactory() {
}
- public static ReduceProcessor getProcessor(String reduceType)
+ public static ReduceProcessor getProcessor(String reduceType, String defaultProcessor)
throws UnknownReduceTypeException {
String path = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer."
+ reduceType;
@@ -54,8 +54,14 @@ public class ReduceProcessorFactory {
.newInstance();
} catch (ClassNotFoundException e) {
// ******** WARNING ********
- // If the ReduceProcessor is not there use Identity instead
- processor = getProcessor("IdentityReducer");
+ // If the ReduceProcessor is not there see if there is a configured
+ // default processor. If not, fall back on the Identity instead
+ if(defaultProcessor != null) {
+ processor = getProcessor(defaultProcessor, null);
+ }
+ else {
+ processor = getProcessor("IdentityReducer", null);
+ }
register(reduceType, processor);
return processor;
} catch (Exception e) {
Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java?rev=939464&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java Thu Apr 29 19:09:21 2010
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.ChunkBuilder;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ChukwaTestOutputCollector;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+import junit.framework.TestCase;
+
+
+import java.io.IOException;
+
+/**
+ * Tests that settings related to the Demux mapper do what they should.
+ */
+public class TestDemuxMapperConfigs extends TestCase {
+
+ public static String SAMPLE_RECORD_DATA = "sampleRecordData";
+
+ public void testSetDefaultMapProcessor() throws IOException {
+ Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> mapper =
+ new Demux.MapClass();
+
+ JobConf conf = new JobConf();
+ conf.set("chukwa.demux.mapper.default.processor",
+ "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MockMapProcessor");
+ mapper.configure(conf);
+
+ ChunkBuilder cb = new ChunkBuilder();
+ cb.addRecord(SAMPLE_RECORD_DATA.getBytes());
+ ChunkImpl chunk = (ChunkImpl)cb.getChunk();
+
+ ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord> output =
+ new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
+
+ mapper.map(new ChukwaArchiveKey(), chunk, output, Reporter.NULL);
+ ChukwaRecordKey recordKey = new ChukwaRecordKey("someReduceType", SAMPLE_RECORD_DATA);
+
+ assertEquals("MockMapProcessor never invoked - no records found", 1, output.data.size());
+ assertNotNull("MockMapProcessor never invoked", output.data.get(recordKey));
+ }
+}
Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java?rev=939464&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java Thu Apr 29 19:09:21 2010
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ChukwaTestOutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import junit.framework.TestCase;
+
+import java.io.IOException;
+
+/**
+ * Tests that settings related to the Demux mapper do what they should.
+ */
+public class TestDemuxReducerConfigs extends TestCase {
+
+ public static String SAMPLE_RECORD_DATA = "sampleRecordData";
+
+ public void testSetDefaultReducerProcessor() throws IOException {
+ Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> reducer =
+ new Demux.ReduceClass();
+
+ JobConf conf = new JobConf();
+ conf.set("chukwa.demux.reducer.default.processor", "MockReduceProcessor");
+ reducer.configure(conf);
+
+ ChukwaRecordKey key = new ChukwaRecordKey("someReduceType", "someKey");
+ ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord> output =
+ new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
+
+ reducer.reduce(key, null, output, Reporter.NULL);
+
+ assertEquals("MockReduceProcessor never invoked - no records found", 1, output.data.size());
+ assertNotNull("MockReduceProcessor never invoked", output.data.get(key));
+ assertEquals("MockReduceProcessor never invoked - key value incorrect",
+ "MockReduceProcessorValue",
+ output.data.get(key).getValue("MockReduceProcessorKey"));
+ }
+}
\ No newline at end of file
Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MockMapProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MockMapProcessor.java?rev=939464&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MockMapProcessor.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MockMapProcessor.java Thu Apr 29 19:09:21 2010
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public class MockMapProcessor extends AbstractProcessor {
+
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+ Reporter reporter) throws Throwable {
+ ChukwaRecordKey key = new ChukwaRecordKey("someReduceType", recordEntry);
+ ChukwaRecord record = new ChukwaRecord();
+
+ output.collect(key, record);
+ }
+
+}
\ No newline at end of file
Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MockReduceProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MockReduceProcessor.java?rev=939464&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MockReduceProcessor.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MockReduceProcessor.java Thu Apr 29 19:09:21 2010
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.util.Iterator;
+import java.io.IOException;
+
+public class MockReduceProcessor implements ReduceProcessor {
+
+ public String getDataType() {
+ return "MockDataType";
+ }
+
+ public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+ Reporter reporter) {
+ ChukwaRecord record = new ChukwaRecord();
+ record.add("MockReduceProcessorKey", "MockReduceProcessorValue");
+
+ try {
+ output.collect(key, record);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file