You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2010/04/29 21:09:22 UTC

svn commit: r939464 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/extraction/demux/ src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ src/test/org/apache/hadoop/chukwa/extraction/demux/ src/test/org/apache/hadoop/c...

Author: asrabkin
Date: Thu Apr 29 19:09:21 2010
New Revision: 939464

URL: http://svn.apache.org/viewvc?rev=939464&view=rev
Log:
CHUKWA-473. Make default processor configurable. Contributed by Bill Graham

Added:
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MockMapProcessor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MockReduceProcessor.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=939464&r1=939463&r2=939464&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Thu Apr 29 19:09:21 2010
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    CHUKWA-473.  Make default processor configurable. (Bill Graham via asrabkin)
+
     CHUKWA-479.  Support HTTP trigger actions (Bill Graham via asrabkin)
 
     CHUKWA-469. Add JMSAdaptor. (Bill Graham via asrabkin)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java?rev=939464&r1=939463&r2=939464&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java Thu Apr 29 19:09:21 2010
@@ -33,6 +33,7 @@ import org.apache.hadoop.chukwa.extracti
 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
 import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessorFactory;
+import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessor;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
@@ -64,12 +65,10 @@ public class Demux extends Configured im
 
   public static class MapClass extends MapReduceBase implements
       Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
-    JobConf jobConf = null;
 
     @Override
     public void configure(JobConf jobConf) {
       super.configure(jobConf);
-      this.jobConf = jobConf;
       Demux.jobConf= jobConf;
     }
 
@@ -85,9 +84,13 @@ public class Demux extends Configured im
           log.debug("Entry: [" + chunk.getData() + "] EventType: ["
               + chunk.getDataType() + "]");
         }
-        String processorClass = jobConf
-            .get(chunk.getDataType(),
-                "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
+
+        String defaultProcessor = Demux.jobConf.get(
+            "chukwa.demux.mapper.default.processor",
+            "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
+
+        String processorClass = Demux.jobConf.get(chunk.getDataType(),
+                defaultProcessor);
 
         if (!processorClass.equalsIgnoreCase("Drop")) {
           reporter.incrCounter("DemuxMapInput", "total chunks", 1);
@@ -119,12 +122,12 @@ public class Demux extends Configured im
 
   public static class ReduceClass extends MapReduceBase implements
       Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
-    
+
     public void configure(JobConf jobConf) {
       super.configure(jobConf);
       Demux.jobConf = jobConf;
     }
-    
+
     public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
         OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
         throws IOException {
@@ -136,8 +139,12 @@ public class Demux extends Configured im
         reporter.incrCounter("DemuxReduceInput", key.getReduceType()
             + " total distinct keys", 1);
 
-        ReduceProcessorFactory.getProcessor(key.getReduceType()).process(key,
-            values, chukwaOutputCollector, reporter);
+        String defaultProcessor = Demux.jobConf.get(
+            "chukwa.demux.reducer.default.processor", null);
+        ReduceProcessor processor = ReduceProcessorFactory.getProcessor(
+            key.getReduceType(), defaultProcessor);
+
+        processor.process(key, values, chukwaOutputCollector, reporter);
 
         if (log.isDebugEnabled()) {
           duration = System.currentTimeMillis() - duration;

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java?rev=939464&r1=939463&r2=939464&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java Thu Apr 29 19:09:21 2010
@@ -41,7 +41,7 @@ public class ReduceProcessorFactory {
   private ReduceProcessorFactory() {
   }
 
-  public static ReduceProcessor getProcessor(String reduceType)
+  public static ReduceProcessor getProcessor(String reduceType, String defaultProcessor)
       throws UnknownReduceTypeException {
     String path = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer."
         + reduceType;
@@ -54,8 +54,14 @@ public class ReduceProcessorFactory {
             .newInstance();
       } catch (ClassNotFoundException e) {
         // ******** WARNING ********
-        // If the ReduceProcessor is not there use Identity instead
-        processor = getProcessor("IdentityReducer");
+        // If the ReduceProcessor is not there see if there is a configured
+        // default processor. If not, fall back on the Identity instead
+        if(defaultProcessor != null) {
+          processor = getProcessor(defaultProcessor, null);
+        }
+        else {
+          processor = getProcessor("IdentityReducer", null);
+        }
         register(reduceType, processor);
         return processor;
       } catch (Exception e) {

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java?rev=939464&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java Thu Apr 29 19:09:21 2010
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import org.apache.hadoop.chukwa.ChukwaArchiveKey;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.ChunkBuilder;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ChukwaTestOutputCollector;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+import junit.framework.TestCase;
+
+
+import java.io.IOException;
+
+/**
+ * Tests that settings related to the Demux mapper do what they should.
+ */
+public class TestDemuxMapperConfigs extends TestCase {
+
+  public static  String SAMPLE_RECORD_DATA = "sampleRecordData";
+
+  public void testSetDefaultMapProcessor() throws IOException {
+    Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> mapper =
+            new Demux.MapClass();
+
+    JobConf conf = new JobConf();
+    conf.set("chukwa.demux.mapper.default.processor",
+             "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MockMapProcessor");
+    mapper.configure(conf);
+
+    ChunkBuilder cb = new ChunkBuilder();
+    cb.addRecord(SAMPLE_RECORD_DATA.getBytes());
+    ChunkImpl chunk = (ChunkImpl)cb.getChunk();
+
+    ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord> output =
+            new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
+
+    mapper.map(new ChukwaArchiveKey(), chunk, output, Reporter.NULL);
+    ChukwaRecordKey recordKey = new ChukwaRecordKey("someReduceType", SAMPLE_RECORD_DATA);
+
+    assertEquals("MockMapProcessor never invoked - no records found", 1, output.data.size());
+    assertNotNull("MockMapProcessor never invoked", output.data.get(recordKey));
+  }
+}

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java?rev=939464&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java Thu Apr 29 19:09:21 2010
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.demux;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ChukwaTestOutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import junit.framework.TestCase;
+
+import java.io.IOException;
+
+/**
+ * Tests that settings related to the Demux mapper do what they should.
+ */
+public class TestDemuxReducerConfigs extends TestCase {
+
+  public static  String SAMPLE_RECORD_DATA = "sampleRecordData";
+
+  public void testSetDefaultReducerProcessor() throws IOException {
+    Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> reducer =
+            new Demux.ReduceClass();
+
+    JobConf conf = new JobConf();
+    conf.set("chukwa.demux.reducer.default.processor", "MockReduceProcessor");
+    reducer.configure(conf);
+
+    ChukwaRecordKey key = new ChukwaRecordKey("someReduceType", "someKey");
+    ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord> output =
+            new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
+
+    reducer.reduce(key, null, output, Reporter.NULL);
+
+    assertEquals("MockReduceProcessor never invoked - no records found", 1, output.data.size());
+    assertNotNull("MockReduceProcessor never invoked", output.data.get(key));
+    assertEquals("MockReduceProcessor never invoked - key value incorrect",
+            "MockReduceProcessorValue",
+            output.data.get(key).getValue("MockReduceProcessorKey"));
+  }
+}
\ No newline at end of file

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MockMapProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MockMapProcessor.java?rev=939464&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MockMapProcessor.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MockMapProcessor.java Thu Apr 29 19:09:21 2010
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+public class MockMapProcessor extends AbstractProcessor {
+
+  protected void parse(String recordEntry,
+                       OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+                       Reporter reporter) throws Throwable {
+    ChukwaRecordKey key = new ChukwaRecordKey("someReduceType", recordEntry);
+    ChukwaRecord record = new ChukwaRecord();
+
+    output.collect(key, record);
+  }
+  
+}
\ No newline at end of file

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MockReduceProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MockReduceProcessor.java?rev=939464&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MockReduceProcessor.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MockReduceProcessor.java Thu Apr 29 19:09:21 2010
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
+
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
+import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.util.Iterator;
+import java.io.IOException;
+
+public class MockReduceProcessor implements ReduceProcessor {
+
+  public String getDataType() {
+    return "MockDataType";
+  }
+
+  public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+                      OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
+                      Reporter reporter) {
+    ChukwaRecord record = new ChukwaRecord();
+    record.add("MockReduceProcessorKey", "MockReduceProcessorValue");
+
+    try {
+      output.collect(key, record);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
\ No newline at end of file