You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/14 00:53:06 UTC
git commit: TEZ-438. Allow users to configure a Partitioner in
Tez-Engine compoenents,
which can be instantiated via reflection (part of TEZ-398). (sseth)
Updated Branches:
refs/heads/TEZ-398 668728f90 -> 057239907
TEZ-438. Allow users to configure a Partitioner in Tez-Engine
compoenents, which can be instantiated via reflection (part of TEZ-398).
(sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/05723990
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/05723990
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/05723990
Branch: refs/heads/TEZ-398
Commit: 057239907905325c6397753335d2d67f39e91b17
Parents: 668728f
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Sep 13 15:52:41 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Sep 13 15:52:41 2013 -0700
----------------------------------------------------------------------
.../org/apache/tez/common/TezJobConfig.java | 7 ++
.../org/apache/tez/engine/api/Partitioner.java | 21 ++++-
.../tez/engine/common/TezEngineUtils.java | 43 ++++++++++
.../engine/common/sort/impl/ExternalSorter.java | 11 ++-
.../apache/tez/mapreduce/hadoop/MRHelpers.java | 3 +
.../mapreduce/newpartition/MRPartitioner.java | 89 ++++++++++++++++++++
6 files changed, 166 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/05723990/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
index 7d8730e..af53bca 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -124,6 +124,13 @@ public class TezJobConfig {
public static final int DEFAULT_TEZ_ENGINE_SORT_THREADS = 1;
/**
+ * Specifies a partitioner class, which is used in Tez engine components like OnFileSortedOutput
+ */
+ public static final String TEZ_ENGINE_PARTITIONER_CLASS = "tez.engine.partitioner.class";
+
+ public static final String TEZ_ENGINE_NUM_EXPECTED_PARTITIONS = "tez.engine.num.expected.partitions";
+
+ /**
*
*/
public static final String COUNTERS_MAX_KEY = "tez.engine.job.counters.max";
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/05723990/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java b/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
index cbef463..ccf3cb8 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
@@ -17,9 +17,26 @@
*/
package org.apache.tez.engine.api;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+
/**
- * {@link Partitioner} is used by the TEZ framework to partition
- * output key/value pairs.
+ * {@link Partitioner} is used by the TEZ framework to partition output
+ * key/value pairs.
+ *
+ * <b>Partitioner Initialization</b></p> The Partitioner class is picked up
+ * using the TEZ_ENGINE_PARTITIONER_CLASS attribute in {@link TezJobConfig}
+ *
+ * TODO NEWTEZ Change construction to first check for a Constructor with a bytep[] payload
+ *
+ * Partitioners need to provide a single argument ({@link Configuration})
+ * constructor or a 0 argument constructor. If both exist, preference is given
+ * to the single argument constructor. This is primarily for MR support.
+ *
+ * If using the configuration constructor, TEZ_ENGINE_NUM_EXPECTED_PARTITIONS
+ * will be set in the configuration, to indicate the max number of expected
+ * partitions.
+ *
*/
public interface Partitioner {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/05723990/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
index b3287c9..d9bb280 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
@@ -18,6 +18,15 @@
package org.apache.tez.engine.common;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.api.Partitioner;
+
public class TezEngineUtils {
public static String getTaskIdentifier(String vertexName, int taskIndex) {
@@ -36,4 +45,38 @@ public class TezEngineUtils {
taskAttemptNumber);
}
+ @SuppressWarnings("unchecked")
+ public static Partitioner instantiatePartitioner(Configuration conf)
+ throws IOException {
+ Class<? extends Partitioner> clazz = (Class<? extends Partitioner>) conf
+ .getClass(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, Partitioner.class);
+
+ Partitioner partitioner = null;
+
+ try {
+ Constructor<? extends Partitioner> ctorWithConf = clazz
+ .getConstructor(Configuration.class);
+ partitioner = ctorWithConf.newInstance(conf);
+ } catch (SecurityException e) {
+ throw new IOException(e);
+ } catch (NoSuchMethodException e) {
+ try {
+ // Try a 0 argument constructor.
+ partitioner = clazz.newInstance();
+ } catch (InstantiationException e1) {
+ throw new IOException(e1);
+ } catch (IllegalAccessException e1) {
+ throw new IOException(e1);
+ }
+ } catch (IllegalArgumentException e) {
+ throw new IOException(e);
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ } catch (InvocationTargetException e) {
+ throw new IOException(e);
+ }
+ return partitioner;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/05723990/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
index 4df1c01..32847e7 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
@@ -48,6 +48,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.engine.api.Partitioner;
import org.apache.tez.engine.api.Processor;
import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.TezEngineUtils;
import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
@@ -138,6 +139,10 @@ public abstract class ExternalSorter {
// Task outputs
mapOutputFile = instantiateTaskOutputManager(this.conf, outputContext);
+
+ LOG.info("Instantiating Partitioner: [" + conf.get(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS) + "]");
+ this.conf.setInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, this.partitions);
+ this.partitioner = TezEngineUtils.instantiatePartitioner(this.conf);
}
// TODO NEWTEZ Add an interface (! Processor) for CombineProcessor, which MR tasks can initialize and set.
@@ -145,11 +150,6 @@ public abstract class ExternalSorter {
public void setCombiner(Processor combineProcessor) {
this.combineProcessor = combineProcessor;
}
-
- // TODO NEWTEZ Setup a config value for the Partitioner class, from where it can be initialized.
- public void setPartitioner(Partitioner partitioner) {
- this.partitioner = partitioner;
- }
/**
* Exception indicating that the allocated sort buffer is insufficient to hold
@@ -242,5 +242,4 @@ public abstract class ExternalSorter {
TezTaskOutputFiles.class.getName()), e);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/05723990/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 759e173..2f4a62a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -55,9 +55,11 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.mapreduce.newpartition.MRPartitioner;
public class MRHelpers {
@@ -364,6 +366,7 @@ public class MRHelpers {
// the AM anyway.
// TODO eventually ACLs
+ conf.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, MRPartitioner.class.getName());
setWorkingDirectory(conf);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/05723990/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
new file mode 100644
index 0000000..07517cb
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.newpartition;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
+
+ static final Log LOG = LogFactory.getLog(MRPartitioner.class);
+
+ private final boolean useNewApi;
+ private int partitions = 1;
+
+ private org.apache.hadoop.mapreduce.Partitioner newPartitioner;
+ Partitioner oldPartitioner;
+
+ public MRPartitioner(Configuration conf) {
+ this.useNewApi = conf.getBoolean("mapred.mapper.new-api", false);
+ this.partitions = conf.getInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, 1);
+
+ if (useNewApi) {
+ if (partitions > 1) {
+ newPartitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils
+ .newInstance(
+ (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf
+ .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR,
+ HashPartitioner.class), conf);
+ } else {
+ newPartitioner = new org.apache.hadoop.mapreduce.Partitioner() {
+ @Override
+ public int getPartition(Object key, Object value, int numPartitions) {
+ return numPartitions - 1;
+ }
+ };
+ }
+ } else {
+ if (partitions > 1) {
+ oldPartitioner = (Partitioner) ReflectionUtils.newInstance(
+ (Class<? extends Partitioner>) conf.getClass(
+ "mapred.partitioner.class", HashPartitioner.class), conf);
+ } else {
+ oldPartitioner = new Partitioner() {
+ @Override
+ public void configure(JobConf job) {
+ }
+
+ @Override
+ public int getPartition(Object key, Object value, int numPartitions) {
+ return numPartitions - 1;
+ }
+ };
+ }
+ }
+ }
+
+ @Override
+ public int getPartition(Object key, Object value, int numPartitions) {
+ if (useNewApi) {
+ return newPartitioner.getPartition(key, value, numPartitions);
+ } else {
+ return oldPartitioner.getPartition(key, value, numPartitions);
+ }
+ }
+}
\ No newline at end of file