You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/09/24 20:13:37 UTC

svn commit: r698680 - in /incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer: DistinctCombiner.java JobControlCompiler.java MRCompiler.java MapReduceOper.java

Author: gates
Date: Wed Sep 24 11:13:36 2008
New Revision: 698680

URL: http://svn.apache.org/viewvc?rev=698680&view=rev
Log:
PIG-450 Add combiner to distinct calculations.


Added:
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java
Modified:
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java

Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java?rev=698680&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java Wed Sep 24 11:13:36 2008
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.io.NullableTuple;
+
+/**
+ * A special implementation of combiner used only for distinct.  This combiner
+ * does not even parse out the records.  It just throws away duplicate values
+ * in the key in order ot minimize the data being sent to the reduce.
+ */
+public class DistinctCombiner {
+
+    public static class Combine extends MapReduceBase
+            implements
+            Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
+        private final Log log = LogFactory.getLog(getClass());
+
+        ProgressableReporter pigReporter;
+        
+        /**
+         * Configures the reporter 
+         */
+        @Override
+        public void configure(JobConf jConf) {
+            super.configure(jConf);
+            pigReporter = new ProgressableReporter();
+        }
+        
+        /**
+         * The reduce function which removes values.
+         */
+        public void reduce(PigNullableWritable key,
+                Iterator<NullableTuple> tupIter,
+                OutputCollector<PigNullableWritable, Writable> oc,
+                Reporter reporter) throws IOException {
+            
+            pigReporter.setRep(reporter);
+
+            // Take the first value and the key and collect
+            // just that.
+            NullableTuple val = tupIter.next();
+            oc.collect(key, val);
+        }
+    }
+    
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=698680&r1=698679&r2=698680&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Sep 24 11:13:36 2008
@@ -332,6 +332,9 @@
                     jobConf.setCombinerClass(PigCombiner.Combine.class);
                     jobConf.set("pig.combinePlan", ObjectSerializer.serialize(mro.combinePlan));
                     jobConf.set("pig.combine.package", ObjectSerializer.serialize(combPack));
+                } else if (mro.needsDistinctCombiner()) {
+                    jobConf.setCombinerClass(DistinctCombiner.Combine.class);
+                    log.info("Setting identity combiner class.");
                 }
                 pack = (POPackage)mro.reducePlan.getRoots().get(0);
                 mro.reducePlan.remove(pack);

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=698680&r1=698679&r2=698680&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Sep 24 11:13:36 2008
@@ -830,6 +830,7 @@
                     flat1);
             nfe1.setResultType(DataType.BAG);
             curMROp.reducePlan.addAsLeaf(nfe1);
+            curMROp.setNeedsDistinctCombiner(true);
         }catch(Exception e){
             VisitorException pe = new VisitorException(e.getMessage());
             pe.initCause(e);

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=698680&r1=698679&r2=698680&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Wed Sep 24 11:13:36 2008
@@ -76,6 +76,10 @@
     
     //Indicates if this job is an order by job
     boolean globalSort = false;
+
+    // If true, putting an identity combine in this
+    // mapreduce job will speed things up.
+    boolean needsDistinctCombiner = false;
     
     //The quantiles file name if globalSort is true
     String quantFile;
@@ -223,6 +227,14 @@
         this.globalSort = globalSort;
     }
 
+    public boolean needsDistinctCombiner() { 
+        return needsDistinctCombiner;
+    }
+
+    public void setNeedsDistinctCombiner(boolean nic) {
+        needsDistinctCombiner = nic;
+    }
+
     public String getQuantFile() {
         return quantFile;
     }