You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/14 00:53:06 UTC

git commit: TEZ-438. Allow users to configure a Partitioner in Tez-Engine compoenents, which can be instantiated via reflection (part of TEZ-398). (sseth)

Updated Branches:
  refs/heads/TEZ-398 668728f90 -> 057239907


TEZ-438. Allow users to configure a Partitioner in Tez-Engine
compoenents, which can be instantiated via reflection (part of TEZ-398).
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/05723990
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/05723990
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/05723990

Branch: refs/heads/TEZ-398
Commit: 057239907905325c6397753335d2d67f39e91b17
Parents: 668728f
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Sep 13 15:52:41 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Sep 13 15:52:41 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/TezJobConfig.java     |  7 ++
 .../org/apache/tez/engine/api/Partitioner.java  | 21 ++++-
 .../tez/engine/common/TezEngineUtils.java       | 43 ++++++++++
 .../engine/common/sort/impl/ExternalSorter.java | 11 ++-
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  |  3 +
 .../mapreduce/newpartition/MRPartitioner.java   | 89 ++++++++++++++++++++
 6 files changed, 166 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/05723990/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
index 7d8730e..af53bca 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -124,6 +124,13 @@ public class TezJobConfig {
   public static final int DEFAULT_TEZ_ENGINE_SORT_THREADS = 1;
 
   /**
+   * Specifies a partitioner class, which is used in Tez engine components like OnFileSortedOutput
+   */
+  public static final String TEZ_ENGINE_PARTITIONER_CLASS = "tez.engine.partitioner.class";
+  
+  public static final String TEZ_ENGINE_NUM_EXPECTED_PARTITIONS = "tez.engine.num.expected.partitions";
+  
+  /**
    * 
    */
   public static final String COUNTERS_MAX_KEY = "tez.engine.job.counters.max";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/05723990/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java b/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
index cbef463..ccf3cb8 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
@@ -17,9 +17,26 @@
  */
 package org.apache.tez.engine.api;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+
 /**
- * {@link Partitioner} is used by the TEZ framework to partition 
- * output key/value pairs.
+ * {@link Partitioner} is used by the TEZ framework to partition output
+ * key/value pairs.
+ * 
+ * <b>Partitioner Initialization</b></p> The Partitioner class is picked up
+ * using the TEZ_ENGINE_PARTITIONER_CLASS attribute in {@link TezJobConfig}
+ * 
+ * TODO NEWTEZ Change construction to first check for a Constructor with a bytep[] payload
+ * 
+ * Partitioners need to provide a single argument ({@link Configuration})
+ * constructor or a 0 argument constructor. If both exist, preference is given
+ * to the single argument constructor. This is primarily for MR support.
+ * 
+ * If using the configuration constructor, TEZ_ENGINE_NUM_EXPECTED_PARTITIONS
+ * will be set in the configuration, to indicate the max number of expected
+ * partitions.
+ * 
  */
 public interface Partitioner {
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/05723990/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
index b3287c9..d9bb280 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
@@ -18,6 +18,15 @@
 
 package org.apache.tez.engine.common;
 
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.api.Partitioner;
+
 public class TezEngineUtils {
 
   public static String getTaskIdentifier(String vertexName, int taskIndex) {
@@ -36,4 +45,38 @@ public class TezEngineUtils {
         taskAttemptNumber);
   }
 
+  @SuppressWarnings("unchecked")
+  public static Partitioner instantiatePartitioner(Configuration conf)
+      throws IOException {
+    Class<? extends Partitioner> clazz = (Class<? extends Partitioner>) conf
+        .getClass(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, Partitioner.class);
+
+    Partitioner partitioner = null;
+
+    try {
+      Constructor<? extends Partitioner> ctorWithConf = clazz
+          .getConstructor(Configuration.class);
+      partitioner = ctorWithConf.newInstance(conf);
+    } catch (SecurityException e) {
+      throw new IOException(e);
+    } catch (NoSuchMethodException e) {
+      try {
+        // Try a 0 argument constructor.
+        partitioner = clazz.newInstance();
+      } catch (InstantiationException e1) {
+        throw new IOException(e1);
+      } catch (IllegalAccessException e1) {
+        throw new IOException(e1);
+      }
+    } catch (IllegalArgumentException e) {
+      throw new IOException(e);
+    } catch (InstantiationException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    } catch (InvocationTargetException e) {
+      throw new IOException(e);
+    }
+    return partitioner;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/05723990/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
index 4df1c01..32847e7 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
@@ -48,6 +48,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.engine.api.Partitioner;
 import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.TezEngineUtils;
 import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
 import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
 import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
@@ -138,6 +139,10 @@ public abstract class ExternalSorter {
 
     // Task outputs
     mapOutputFile = instantiateTaskOutputManager(this.conf, outputContext);
+    
+    LOG.info("Instantiating Partitioner: [" + conf.get(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS) + "]");
+    this.conf.setInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, this.partitions);
+    this.partitioner = TezEngineUtils.instantiatePartitioner(this.conf);
   }
 
   // TODO NEWTEZ Add an interface (! Processor) for CombineProcessor, which MR tasks can initialize and set.
@@ -145,11 +150,6 @@ public abstract class ExternalSorter {
   public void setCombiner(Processor combineProcessor) {
     this.combineProcessor = combineProcessor;
   }
-  
-  // TODO NEWTEZ Setup a config value for the Partitioner class, from where it can be initialized.
-  public void setPartitioner(Partitioner partitioner) {
-    this.partitioner = partitioner;
-  }
 
   /**
    * Exception indicating that the allocated sort buffer is insufficient to hold
@@ -242,5 +242,4 @@ public abstract class ExternalSorter {
                   TezTaskOutputFiles.class.getName()), e);
     }
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/05723990/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 759e173..2f4a62a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -55,9 +55,11 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.mapreduce.newpartition.MRPartitioner;
 
 
 public class MRHelpers {
@@ -364,6 +366,7 @@ public class MRHelpers {
     // the AM anyway.
 
     // TODO eventually ACLs
+    conf.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, MRPartitioner.class.getName());
     setWorkingDirectory(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/05723990/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
new file mode 100644
index 0000000..07517cb
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.newpartition;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
+
+  static final Log LOG = LogFactory.getLog(MRPartitioner.class);
+
+  private final boolean useNewApi;
+  private int partitions = 1;
+
+  private org.apache.hadoop.mapreduce.Partitioner newPartitioner;
+  Partitioner oldPartitioner;
+
+  public MRPartitioner(Configuration conf) {
+    this.useNewApi = conf.getBoolean("mapred.mapper.new-api", false);
+    this.partitions = conf.getInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, 1);
+
+    if (useNewApi) {
+      if (partitions > 1) {
+        newPartitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils
+            .newInstance(
+                (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf
+                    .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR,
+                        HashPartitioner.class), conf);
+      } else {
+        newPartitioner = new org.apache.hadoop.mapreduce.Partitioner() {
+          @Override
+          public int getPartition(Object key, Object value, int numPartitions) {
+            return numPartitions - 1;
+          }
+        };
+      }
+    } else {
+      if (partitions > 1) {
+        oldPartitioner = (Partitioner) ReflectionUtils.newInstance(
+            (Class<? extends Partitioner>) conf.getClass(
+                "mapred.partitioner.class", HashPartitioner.class), conf);
+      } else {
+        oldPartitioner = new Partitioner() {
+          @Override
+          public void configure(JobConf job) {
+          }
+
+          @Override
+          public int getPartition(Object key, Object value, int numPartitions) {
+            return numPartitions - 1;
+          }
+        };
+      }
+    }
+  }
+
+  @Override
+  public int getPartition(Object key, Object value, int numPartitions) {
+    if (useNewApi) {
+      return newPartitioner.getPartition(key, value, numPartitions);
+    } else {
+      return oldPartitioner.getPartition(key, value, numPartitions);
+    }
+  }
+}
\ No newline at end of file