You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2012/12/07 03:36:34 UTC
svn commit: r1418173 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoo...
Author: acmurthy
Date: Fri Dec 7 02:36:33 2012
New Revision: 1418173
URL: http://svn.apache.org/viewvc?rev=1418173&view=rev
Log:
MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins. Contributed by Anver BenHanoch.
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1418173&r1=1418172&r2=1418173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Dec 7 02:36:33 2012
@@ -11,6 +11,9 @@ Trunk (Unreleased)
MAPREDUCE-2669. Add new examples for Mean, Median, and Standard Deviation.
(Plamen Jeliazkov via shv)
+ MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins.
+ (Avner BenHanoch via acmurthy)
+
IMPROVEMENTS
MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1418173&r1=1418172&r2=1418173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java Fri Dec 7 02:36:33 2012
@@ -340,6 +340,7 @@ public class ReduceTask extends Task {
// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
+ ShuffleConsumerPlugin shuffleConsumerPlugin = null;
boolean isLocal = false;
// local if
@@ -358,8 +359,14 @@ public class ReduceTask extends Task {
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
- Shuffle shuffle =
- new Shuffle(getTaskID(), job, FileSystem.getLocal(job), umbilical,
+ Class<? extends ShuffleConsumerPlugin> clazz =
+ job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
+
+ shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
+ LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
+
+ ShuffleConsumerPlugin.Context shuffleContext =
+ new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
@@ -368,7 +375,8 @@ public class ReduceTask extends Task {
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile);
- rIter = shuffle.run();
+ shuffleConsumerPlugin.init(shuffleContext);
+ rIter = shuffleConsumerPlugin.run();
} else {
// local job runner doesn't have a copy phase
copyPhase.complete();
@@ -399,6 +407,10 @@ public class ReduceTask extends Task {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
+
+ if (shuffleConsumerPlugin != null) {
+ shuffleConsumerPlugin.close();
+ }
done(umbilical, reporter);
}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java?rev=1418173&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ShuffleConsumerPlugin.java Fri Dec 7 02:36:33 2012
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.Task.CombineOutputCollector;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * ShuffleConsumerPlugin for serving Reducers. It may shuffle MOF files from
+ * either the built-in ShuffleHandler or from a 3rd party AuxiliaryService.
+ *
+ */
+@InterfaceAudience.LimitedPrivate("mapreduce")
+@InterfaceStability.Unstable
+public interface ShuffleConsumerPlugin<K, V> {
+
+ public void init(Context<K, V> context);
+
+ public RawKeyValueIterator run() throws IOException, InterruptedException;
+
+ public void close();
+
+ @InterfaceAudience.LimitedPrivate("mapreduce")
+ @InterfaceStability.Unstable
+ public static class Context<K,V> {
+ private final org.apache.hadoop.mapreduce.TaskAttemptID reduceId;
+ private final JobConf jobConf;
+ private final FileSystem localFS;
+ private final TaskUmbilicalProtocol umbilical;
+ private final LocalDirAllocator localDirAllocator;
+ private final Reporter reporter;
+ private final CompressionCodec codec;
+ private final Class<? extends Reducer> combinerClass;
+ private final CombineOutputCollector<K, V> combineCollector;
+ private final Counters.Counter spilledRecordsCounter;
+ private final Counters.Counter reduceCombineInputCounter;
+ private final Counters.Counter shuffledMapsCounter;
+ private final Counters.Counter reduceShuffleBytes;
+ private final Counters.Counter failedShuffleCounter;
+ private final Counters.Counter mergedMapOutputsCounter;
+ private final TaskStatus status;
+ private final Progress copyPhase;
+ private final Progress mergePhase;
+ private final Task reduceTask;
+ private final MapOutputFile mapOutputFile;
+
+ public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
+ JobConf jobConf, FileSystem localFS,
+ TaskUmbilicalProtocol umbilical,
+ LocalDirAllocator localDirAllocator,
+ Reporter reporter, CompressionCodec codec,
+ Class<? extends Reducer> combinerClass,
+ CombineOutputCollector<K,V> combineCollector,
+ Counters.Counter spilledRecordsCounter,
+ Counters.Counter reduceCombineInputCounter,
+ Counters.Counter shuffledMapsCounter,
+ Counters.Counter reduceShuffleBytes,
+ Counters.Counter failedShuffleCounter,
+ Counters.Counter mergedMapOutputsCounter,
+ TaskStatus status, Progress copyPhase, Progress mergePhase,
+ Task reduceTask, MapOutputFile mapOutputFile) {
+ this.reduceId = reduceId;
+ this.jobConf = jobConf;
+ this.localFS = localFS;
+ this. umbilical = umbilical;
+ this.localDirAllocator = localDirAllocator;
+ this.reporter = reporter;
+ this.codec = codec;
+ this.combinerClass = combinerClass;
+ this.combineCollector = combineCollector;
+ this.spilledRecordsCounter = spilledRecordsCounter;
+ this.reduceCombineInputCounter = reduceCombineInputCounter;
+ this.shuffledMapsCounter = shuffledMapsCounter;
+ this.reduceShuffleBytes = reduceShuffleBytes;
+ this.failedShuffleCounter = failedShuffleCounter;
+ this.mergedMapOutputsCounter = mergedMapOutputsCounter;
+ this.status = status;
+ this.copyPhase = copyPhase;
+ this.mergePhase = mergePhase;
+ this.reduceTask = reduceTask;
+ this.mapOutputFile = mapOutputFile;
+ }
+
+ public org.apache.hadoop.mapreduce.TaskAttemptID getReduceId() {
+ return reduceId;
+ }
+ public JobConf getJobConf() {
+ return jobConf;
+ }
+ public FileSystem getLocalFS() {
+ return localFS;
+ }
+ public TaskUmbilicalProtocol getUmbilical() {
+ return umbilical;
+ }
+ public LocalDirAllocator getLocalDirAllocator() {
+ return localDirAllocator;
+ }
+ public Reporter getReporter() {
+ return reporter;
+ }
+ public CompressionCodec getCodec() {
+ return codec;
+ }
+ public Class<? extends Reducer> getCombinerClass() {
+ return combinerClass;
+ }
+ public CombineOutputCollector<K, V> getCombineCollector() {
+ return combineCollector;
+ }
+ public Counters.Counter getSpilledRecordsCounter() {
+ return spilledRecordsCounter;
+ }
+ public Counters.Counter getReduceCombineInputCounter() {
+ return reduceCombineInputCounter;
+ }
+ public Counters.Counter getShuffledMapsCounter() {
+ return shuffledMapsCounter;
+ }
+ public Counters.Counter getReduceShuffleBytes() {
+ return reduceShuffleBytes;
+ }
+ public Counters.Counter getFailedShuffleCounter() {
+ return failedShuffleCounter;
+ }
+ public Counters.Counter getMergedMapOutputsCounter() {
+ return mergedMapOutputsCounter;
+ }
+ public TaskStatus getStatus() {
+ return status;
+ }
+ public Progress getCopyPhase() {
+ return copyPhase;
+ }
+ public Progress getMergePhase() {
+ return mergePhase;
+ }
+ public Task getReduceTask() {
+ return reduceTask;
+ }
+ public MapOutputFile getMapOutputFile() {
+ return mapOutputFile;
+ }
+ } // end of public static class Context<K,V>
+
+}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1418173&r1=1418172&r2=1418173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Fri Dec 7 02:36:33 2012
@@ -85,6 +85,9 @@ public interface MRConfig {
public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
+ public static final String SHUFFLE_CONSUMER_PLUGIN =
+ "mapreduce.job.reduce.shuffle.consumer.plugin.class";
+
/**
* Configuration key to enable/disable IFile readahead.
*/
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1418173&r1=1418172&r2=1418173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Fri Dec 7 02:36:33 2012
@@ -34,73 +34,63 @@ import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progress;
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate("mapreduce")
@InterfaceStability.Unstable
@SuppressWarnings({"unchecked", "rawtypes"})
-public class Shuffle<K, V> implements ExceptionReporter {
+public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionReporter {
private static final int PROGRESS_FREQUENCY = 2000;
private static final int MAX_EVENTS_TO_FETCH = 10000;
private static final int MIN_EVENTS_TO_FETCH = 100;
private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
- private final TaskAttemptID reduceId;
- private final JobConf jobConf;
- private final Reporter reporter;
- private final ShuffleClientMetrics metrics;
- private final TaskUmbilicalProtocol umbilical;
+ private ShuffleConsumerPlugin.Context context;
+
+ private TaskAttemptID reduceId;
+ private JobConf jobConf;
+ private Reporter reporter;
+ private ShuffleClientMetrics metrics;
+ private TaskUmbilicalProtocol umbilical;
- private final ShuffleScheduler<K,V> scheduler;
- private final MergeManager<K, V> merger;
+ private ShuffleScheduler<K,V> scheduler;
+ private MergeManager<K, V> merger;
private Throwable throwable = null;
private String throwingThreadName = null;
- private final Progress copyPhase;
- private final TaskStatus taskStatus;
- private final Task reduceTask; //Used for status updates
-
- public Shuffle(TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS,
- TaskUmbilicalProtocol umbilical,
- LocalDirAllocator localDirAllocator,
- Reporter reporter,
- CompressionCodec codec,
- Class<? extends Reducer> combinerClass,
- CombineOutputCollector<K,V> combineCollector,
- Counters.Counter spilledRecordsCounter,
- Counters.Counter reduceCombineInputCounter,
- Counters.Counter shuffledMapsCounter,
- Counters.Counter reduceShuffleBytes,
- Counters.Counter failedShuffleCounter,
- Counters.Counter mergedMapOutputsCounter,
- TaskStatus status,
- Progress copyPhase,
- Progress mergePhase,
- Task reduceTask,
- MapOutputFile mapOutputFile) {
- this.reduceId = reduceId;
- this.jobConf = jobConf;
- this.umbilical = umbilical;
- this.reporter = reporter;
+ private Progress copyPhase;
+ private TaskStatus taskStatus;
+ private Task reduceTask; //Used for status updates
+
+ @Override
+ public void init(ShuffleConsumerPlugin.Context context) {
+ this.context = context;
+
+ this.reduceId = context.getReduceId();
+ this.jobConf = context.getJobConf();
+ this.umbilical = context.getUmbilical();
+ this.reporter = context.getReporter();
this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
- this.copyPhase = copyPhase;
- this.taskStatus = status;
- this.reduceTask = reduceTask;
+ this.copyPhase = context.getCopyPhase();
+ this.taskStatus = context.getStatus();
+ this.reduceTask = context.getReduceTask();
scheduler =
- new ShuffleScheduler<K,V>(jobConf, status, this, copyPhase,
- shuffledMapsCounter,
- reduceShuffleBytes, failedShuffleCounter);
- merger = new MergeManager<K, V>(reduceId, jobConf, localFS,
- localDirAllocator, reporter, codec,
- combinerClass, combineCollector,
- spilledRecordsCounter,
- reduceCombineInputCounter,
- mergedMapOutputsCounter,
- this, mergePhase, mapOutputFile);
+ new ShuffleScheduler<K,V>(jobConf, taskStatus, this, copyPhase,
+ context.getShuffledMapsCounter(),
+ context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
+ merger = new MergeManager<K, V>(reduceId, jobConf, context.getLocalFS(),
+ context.getLocalDirAllocator(), reporter, context.getCodec(),
+ context.getCombinerClass(), context.getCombineCollector(),
+ context.getSpilledRecordsCounter(),
+ context.getReduceCombineInputCounter(),
+ context.getMergedMapOutputsCounter(),
+ this, context.getMergePhase(), context.getMapOutputFile());
}
+ @Override
public RawKeyValueIterator run() throws IOException, InterruptedException {
// Scale the maximum events we fetch per RPC call to mitigate OOM issues
// on the ApplicationMaster when a thundering herd of reducers fetch events
@@ -171,6 +161,10 @@ public class Shuffle<K, V> implements Ex
return kvIter;
}
+ @Override
+ public void close(){
+ }
+
public synchronized void reportException(Throwable t) {
if (throwable == null) {
throwable = t;
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1418173&r1=1418172&r2=1418173&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Dec 7 02:36:33 2012
@@ -748,6 +748,16 @@
</description>
</property>
+<property>
+ <name>mapreduce.job.reduce.shuffle.consumer.plugin.class</name>
+ <value>org.apache.hadoop.mapreduce.task.reduce.Shuffle</value>
+ <description>
+ Name of the class whose instance will be used
+ to send shuffle requests by reducetasks of this job.
+ The class must be an instance of org.apache.hadoop.mapred.ShuffleConsumerPlugin.
+ </description>
+</property>
+
<!-- MR YARN Application properties -->
<property>
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java?rev=1418173&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestShufflePlugin.java Fri Dec 7 02:36:33 2012
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.Task.CombineOutputCollector;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.task.reduce.Shuffle;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.ReduceTask;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+
+/**
+ * A JUnit for testing availability and accessibility of shuffle related API.
+ * It is needed for maintaining comptability with external sub-classes of
+ * ShuffleConsumerPlugin and AuxiliaryService(s) like ShuffleHandler.
+ *
+ * The importance of this test is for preserving API with 3rd party plugins.
+ */
+public class TestShufflePlugin<K, V> {
+
+ static class TestShuffleConsumerPlugin<K, V> implements ShuffleConsumerPlugin<K, V> {
+
+ @Override
+ public void init(ShuffleConsumerPlugin.Context<K, V> context) {
+ // just verify that Context has kept its public interface
+ context.getReduceId();
+ context.getJobConf();
+ context.getLocalFS();
+ context.getUmbilical();
+ context.getLocalDirAllocator();
+ context.getReporter();
+ context.getCodec();
+ context.getCombinerClass();
+ context.getCombineCollector();
+ context.getSpilledRecordsCounter();
+ context.getReduceCombineInputCounter();
+ context.getShuffledMapsCounter();
+ context.getReduceShuffleBytes();
+ context.getFailedShuffleCounter();
+ context.getMergedMapOutputsCounter();
+ context.getStatus();
+ context.getCopyPhase();
+ context.getMergePhase();
+ context.getReduceTask();
+ context.getMapOutputFile();
+ }
+
+ @Override
+ public void close(){
+ }
+
+ @Override
+ public RawKeyValueIterator run() throws java.io.IOException, java.lang.InterruptedException{
+ return null;
+ }
+ }
+
+
+
+ @Test
+ /**
+ * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
+ * as if it came from a 3rd party.
+ */
+ public void testPluginAbility() {
+
+ try{
+ // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
+ JobConf jobConf = new JobConf();
+ jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,
+ TestShufflePlugin.TestShuffleConsumerPlugin.class,
+ ShuffleConsumerPlugin.class);
+
+ ShuffleConsumerPlugin shuffleConsumerPlugin = null;
+ Class<? extends ShuffleConsumerPlugin> clazz =
+ jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
+ assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz);
+
+ // load 3rd party plugin through core's factory method
+ shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
+ assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin);
+ }
+ catch (Exception e) {
+ assertTrue("Threw exception:" + e, false);
+ }
+ }
+
+ @Test
+ /**
+ * A testing method verifying availability and accessibility of API that is needed
+ * for sub-classes of ShuffleConsumerPlugin
+ */
+ public void testConsumerApi() {
+
+ JobConf jobConf = new JobConf();
+ ShuffleConsumerPlugin<K, V> shuffleConsumerPlugin = new TestShuffleConsumerPlugin<K, V>();
+
+ //mock creation
+ ReduceTask mockReduceTask = mock(ReduceTask.class);
+ TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
+ Reporter mockReporter = mock(Reporter.class);
+ FileSystem mockFileSystem = mock(FileSystem.class);
+ Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = jobConf.getCombinerClass();
+ @SuppressWarnings("unchecked") // needed for mock with generic
+ CombineOutputCollector<K, V> mockCombineOutputCollector =
+ (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
+ org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
+ mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
+ LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
+ CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
+ Counter mockCounter = mock(Counter.class);
+ TaskStatus mockTaskStatus = mock(TaskStatus.class);
+ Progress mockProgress = mock(Progress.class);
+ MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
+ Task mockTask = mock(Task.class);
+
+ try {
+ String [] dirs = jobConf.getLocalDirs();
+ // verify that these APIs are available through super class handler
+ ShuffleConsumerPlugin.Context<K, V> context =
+ new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, jobConf, mockFileSystem,
+ mockUmbilical, mockLocalDirAllocator,
+ mockReporter, mockCompressionCodec,
+ combinerClass, mockCombineOutputCollector,
+ mockCounter, mockCounter, mockCounter,
+ mockCounter, mockCounter, mockCounter,
+ mockTaskStatus, mockProgress, mockProgress,
+ mockTask, mockMapOutputFile);
+ shuffleConsumerPlugin.init(context);
+ shuffleConsumerPlugin.run();
+ shuffleConsumerPlugin.close();
+ }
+ catch (Exception e) {
+ assertTrue("Threw exception:" + e, false);
+ }
+
+ // verify that these APIs are available for 3rd party plugins
+ mockReduceTask.getTaskID();
+ mockReduceTask.getJobID();
+ mockReduceTask.getNumMaps();
+ mockReduceTask.getPartition();
+ mockReporter.progress();
+ }
+
+ @Test
+ /**
+ * A testing method verifying availability and accessibility of API needed for
+ * AuxiliaryService(s) which are "Shuffle-Providers" (ShuffleHandler and 3rd party plugins)
+ */
+ public void testProviderApi() {
+
+ ApplicationId mockApplicationId = mock(ApplicationId.class);
+ mockApplicationId.setClusterTimestamp(new Long(10));
+ mockApplicationId.setId(mock(JobID.class).getId());
+ LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
+ JobConf mockJobConf = mock(JobConf.class);
+ try {
+ mockLocalDirAllocator.getLocalPathToRead("", mockJobConf);
+ }
+ catch (Exception e) {
+ assertTrue("Threw exception:" + e, false);
+ }
+ }
+}