You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC
svn commit: r1457129 [16/38] - in /incubator/tez: ./ tez-ampool/
tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/
tez-ampool/src/main/conf/ tez-ampool/src/main/java/
tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.processor.reduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Master;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class ReduceProcessor
+extends MRTask
+implements Processor {
+
+ private static final Log LOG = LogFactory.getLog(ReduceProcessor.class);
+
+ private Progress sortPhase;
+ private Progress reducePhase;
+
+ private Counter reduceInputKeyCounter;
+ private Counter reduceInputValueCounter;
+ private int numMapTasks;
+
+ @Inject
+ public ReduceProcessor(
+ @Assisted TezTask context
+ ) {
+ super(context);
+ MRTaskContext mrTaskContext = (MRTaskContext)context;
+ this.numMapTasks = mrTaskContext.getNumMapTasks();
+ }
+
+ @Override
+ public void initialize(Configuration conf, Master master) throws IOException,
+ InterruptedException {
+ super.initialize(conf, master);
+
+ }
+
+ @Override
+ public void process(Input in, Output out)
+ throws IOException, InterruptedException {
+ MRTaskReporter reporter = new MRTaskReporter(getTaskReporter());
+ boolean useNewApi = jobConf.getUseNewMapper();
+ initTask(jobConf, getJobID(), reporter, useNewApi);
+
+ if (in instanceof SimpleInput) {
+ ((SimpleInput)in).setTask(this);
+ } else if (in instanceof ShuffledMergedInput) {
+ ((ShuffledMergedInput)in).setTask(this);
+ }
+
+ if (out instanceof SimpleOutput) {
+ ((SimpleOutput)out).setTask(this);
+ }
+
+ in.initialize(jobConf, getTaskReporter());
+ out.initialize(jobConf, getTaskReporter());
+
+ sortPhase = getProgress().addPhase("sort");
+ reducePhase = getProgress().addPhase("reduce");
+ sortPhase.complete(); // sort is complete
+ setPhase(TezTaskStatus.Phase.REDUCE);
+
+ this.statusUpdate();
+
+ Class keyClass = ConfigUtils.getMapOutputKeyClass(jobConf);
+ Class valueClass = ConfigUtils.getMapOutputValueClass(jobConf);
+ RawComparator comparator =
+ ConfigUtils.getOutputValueGroupingComparator(jobConf);
+
+ reduceInputKeyCounter =
+ reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+ reduceInputValueCounter =
+ reporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS);
+
+ // Sanity check
+ if (!(in instanceof ShuffledMergedInput)) {
+ throw new IOException("Illegal input to reduce: " + in.getClass());
+ }
+ ShuffledMergedInput shuffleInput = (ShuffledMergedInput)in;
+
+ if (useNewApi) {
+ try {
+ runNewReducer(
+ jobConf,
+ (TezTaskUmbilicalProtocol)getUmbilical(), reporter,
+ shuffleInput, comparator, keyClass, valueClass,
+ (SimpleOutput) out);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+ } else {
+ runOldReducer(
+ jobConf, (TezTaskUmbilicalProtocol)getUmbilical(), reporter,
+ shuffleInput, comparator, keyClass, valueClass, (SimpleOutput) out);
+ }
+
+ done(out.getOutputContext(), reporter);
+ }
+
+ public void close() throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+
+ }
+
+ void runOldReducer(JobConf job,
+ TezTaskUmbilicalProtocol umbilical,
+ final MRTaskReporter reporter,
+ ShuffledMergedInput input,
+ RawComparator comparator,
+ Class keyClass,
+ Class valueClass,
+ final SimpleOutput output) throws IOException, InterruptedException {
+
+ Reducer reducer =
+ ReflectionUtils.newInstance(job.getReducerClass(), job);
+
+ // make output collector
+
+ OutputCollector collector =
+ new OutputCollector() {
+ public void collect(Object key, Object value)
+ throws IOException {
+ try {
+ output.write(key, value);
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+ };
+
+ // apply reduce function
+ try {
+ ReduceValuesIterator values =
+ new ReduceValuesIterator(
+ input,
+ job.getOutputValueGroupingComparator(), keyClass, valueClass,
+ job, reporter, reduceInputValueCounter, reducePhase);
+
+ values.informReduceProgress();
+ while (values.more()) {
+ reduceInputKeyCounter.increment(1);
+ reducer.reduce(values.getKey(), values, collector, reporter);
+ values.nextKey();
+ values.informReduceProgress();
+ }
+
+ //Clean up: repeated in catch block below
+ reducer.close();
+ output.close();
+ //End of clean up.
+ } catch (IOException ioe) {
+ try {
+ reducer.close();
+ } catch (IOException ignored) {}
+
+ try {
+ output.close();
+ } catch (IOException ignored) {}
+
+ throw ioe;
+ }
+ }
+
+ private static class ReduceValuesIterator<KEY,VALUE>
+ extends org.apache.tez.engine.common.task.impl.ValuesIterator<KEY,VALUE> {
+ private Counter reduceInputValueCounter;
+ private Progress reducePhase;
+
+ public ReduceValuesIterator (ShuffledMergedInput in,
+ RawComparator<KEY> comparator,
+ Class<KEY> keyClass,
+ Class<VALUE> valClass,
+ Configuration conf, Progressable reporter,
+ Counter reduceInputValueCounter,
+ Progress reducePhase)
+ throws IOException {
+ super(in.getIterator(), comparator, keyClass, valClass, conf, reporter);
+ this.reduceInputValueCounter = reduceInputValueCounter;
+ this.reducePhase = reducePhase;
+ }
+
+ @Override
+ public VALUE next() {
+ reduceInputValueCounter.increment(1);
+ return moveToNext();
+ }
+
+ protected VALUE moveToNext() {
+ return super.next();
+ }
+
+ public void informReduceProgress() {
+ reducePhase.set(super.in.getProgress().getProgress()); // update progress
+ reporter.progress();
+ }
+ }
+
+ void runNewReducer(JobConf job,
+ final TezTaskUmbilicalProtocol umbilical,
+ final MRTaskReporter reporter,
+ ShuffledMergedInput input,
+ RawComparator comparator,
+ Class keyClass,
+ Class valueClass,
+ final SimpleOutput out
+ ) throws IOException,InterruptedException,
+ ClassNotFoundException {
+ // wrap value iterator to report progress.
+ final TezRawKeyValueIterator rawIter = input.getIterator();
+ TezRawKeyValueIterator rIter = new TezRawKeyValueIterator() {
+ public void close() throws IOException {
+ rawIter.close();
+ }
+ public DataInputBuffer getKey() throws IOException {
+ return rawIter.getKey();
+ }
+ public Progress getProgress() {
+ return rawIter.getProgress();
+ }
+ public DataInputBuffer getValue() throws IOException {
+ return rawIter.getValue();
+ }
+ public boolean next() throws IOException {
+ boolean ret = rawIter.next();
+ reporter.setProgress(rawIter.getProgress().getProgress());
+ return ret;
+ }
+ };
+
+ // make a task context so we can get the classes
+ org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
+ new TaskAttemptContextImpl(job, getTaskAttemptId(), reporter);
+
+ // make a reducer
+ org.apache.hadoop.mapreduce.Reducer reducer =
+ (org.apache.hadoop.mapreduce.Reducer)
+ ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
+
+ org.apache.hadoop.mapreduce.RecordWriter trackedRW =
+ new org.apache.hadoop.mapreduce.RecordWriter() {
+
+ @Override
+ public void write(Object key, Object value) throws IOException,
+ InterruptedException {
+ out.write(key, value);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ out.close();
+ }
+ };
+
+ org.apache.hadoop.mapreduce.Reducer.Context reducerContext =
+ createReduceContext(
+ reducer, job, getTaskAttemptId(),
+ rIter, reduceInputKeyCounter,
+ reduceInputValueCounter,
+ trackedRW,
+ committer,
+ reporter, comparator, keyClass,
+ valueClass);
+ reducer.run(reducerContext);
+ trackedRW.close(reducerContext);
+ }
+
+ @Override
+ public void localizeConfiguration(JobConf jobConf)
+ throws IOException, InterruptedException {
+ super.localizeConfiguration(jobConf);
+ jobConf.setBoolean(JobContext.TASK_ISMAP, false);
+ jobConf.setInt(TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, numMapTasks);
+ }
+
+ @Override
+ public TezCounter getOutputRecordsCounter() {
+ return reporter.getCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
+ }
+
+ @Override
+ public TezCounter getInputRecordsCounter() {
+ return reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+ }
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class FinalTask extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, SimpleOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, ShuffledMergedInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, ReduceProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, RuntimeTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class InitialTask extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, SimpleInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, OnFileSortedOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, MapProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, RuntimeTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.lib.output.InMemorySortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class InitialTaskWithInMemSort extends AbstractModule {
+// TODO EVENTUALLY - have all types subclass a single parent instead of AbstractModule.
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, SimpleInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, InMemorySortedOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, MapProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, RuntimeTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class InitialTaskWithLocalSort extends AbstractModule {
+// TODO EVENTUALLY - have all types subclass a single parent instead of AbstractModule.
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, SimpleInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, LocalOnFileSorterOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, MapProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, RuntimeTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+
+public class IntermediateTask extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, ShuffledMergedInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, OnFileSortedOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, ReduceProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, RuntimeTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.lib.input.LocalMergedInput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class LocalFinalTask extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, SimpleOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, LocalMergedInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, ReduceProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, RuntimeTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class MapOnlyTask extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, SimpleInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, SimpleOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, MapProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, RuntimeTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/MRTaskContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/MRTaskContext.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/MRTaskContext.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/MRTaskContext.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.mapreduce.task.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.records.TezTaskAttemptID;
+
+// TODO XXX Writable serialization for now.
+public class MRTaskContext extends TezTask {
+ private String splitLocation;
+ private long splitOffset;
+ private int numMapTasks;
+
+ // Required for serialization.
+ public MRTaskContext() {
+ }
+
+ // TODO TEZAM5 Remove jobToken from the consturctor.
+ public MRTaskContext(TezTaskAttemptID taskAttemptId, String user,
+ String jobName, String moduleClassName, SecretKey jobToken,
+ String splitLocation, long splitOffset, int numMapTasks) {
+ super(taskAttemptId, user, jobName, moduleClassName);
+ this.splitLocation = splitLocation;
+ this.splitOffset = splitOffset;
+ this.numMapTasks = numMapTasks;
+ }
+
+ public String getSplitLocation() {
+ return splitLocation;
+ }
+
+ public long getSplitOffset() {
+ return splitOffset;
+ }
+
+ public int getNumMapTasks() {
+ return this.numMapTasks;
+ }
+
+ public void setNumMapTasks(int numMapTasks) {
+ this.numMapTasks = numMapTasks;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ // TODO VERIFY That split location needs to be serialized.
+ WritableUtils.writeString(out, splitLocation);
+ out.writeLong(splitOffset);
+ out.writeInt(numMapTasks);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ splitLocation = WritableUtils.readString(in);
+ splitOffset = in.readLong();
+ numMapTasks = in.readInt();
+ }
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,239 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.task.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.tez.common.Constants;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class YarnOutputFiles extends MapOutputFile {
+
+ private JobConf conf;
+
+ private static final String JOB_OUTPUT_DIR = "output";
+ private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
+ private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
+ + ".index";
+
+ public YarnOutputFiles() {
+ }
+
+ // assume configured to $localdir/usercache/$user/appcache/$appId
+ private LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator(MRConfig.LOCAL_DIR);
+
+ private Path getAttemptOutputDir() {
+ return new Path(JOB_OUTPUT_DIR, conf.get(JobContext.TASK_ATTEMPT_ID));
+ }
+
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputFile() throws IOException {
+ Path attemptOutput =
+ new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING);
+ return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
+ }
+
+ /**
+ * Create a local map output file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputFileForWrite(long size) throws IOException {
+ Path attemptOutput =
+ new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING);
+ return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
+ }
+
+ /**
+ * Create a local map output file name on the same volume.
+ */
+ public Path getOutputFileForWriteInVolume(Path existing) {
+ Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
+ Path attemptOutputDir = new Path(outputDir,
+ conf.get(JobContext.TASK_ATTEMPT_ID));
+ return new Path(attemptOutputDir, Constants.MAP_OUTPUT_FILENAME_STRING);
+ }
+
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputIndexFile() throws IOException {
+ Path attemptIndexOutput =
+ new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING +
+ Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+ return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
+ }
+
+ /**
+ * Create a local map output index file name.
+ *
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputIndexFileForWrite(long size) throws IOException {
+ Path attemptIndexOutput =
+ new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING +
+ Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+ return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
+ size, conf);
+ }
+
+ /**
+ * Create a local map output index file name on the same volume.
+ */
+ public Path getOutputIndexFileForWriteInVolume(Path existing) {
+ Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
+ Path attemptOutputDir = new Path(outputDir,
+ conf.get(JobContext.TASK_ATTEMPT_ID));
+ return new Path(attemptOutputDir, Constants.MAP_OUTPUT_FILENAME_STRING +
+ Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+ }
+
+ /**
+ * Return a local map spill file created earlier.
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillFile(int spillNumber) throws IOException {
+ return lDirAlloc.getLocalPathToRead(
+ String.format(SPILL_FILE_PATTERN,
+ conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
+ }
+
+ /**
+ * Create a local map spill file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(
+ String.format(String.format(SPILL_FILE_PATTERN,
+ conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber)), size, conf);
+ }
+
+ /**
+ * Return a local map spill index file created earlier
+ *
+ * @param spillNumber the number
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillIndexFile(int spillNumber) throws IOException {
+ return lDirAlloc.getLocalPathToRead(
+ String.format(SPILL_INDEX_FILE_PATTERN,
+ conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
+ }
+
+ /**
+ * Create a local map spill index file name.
+ *
+ * @param spillNumber the number
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(
+ String.format(SPILL_INDEX_FILE_PATTERN,
+ conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), size, conf);
+ }
+
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param mapId a map task id
+ * @return path
+ * @throws IOException
+ */
+ public Path getInputFile(int mapId) throws IOException {
+ throw new UnsupportedOperationException("Incompatible with LocalRunner");
+ }
+
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param mapId a map task id
+ * @param size the size of the file
+ * @return path
+ * @throws IOException
+ */
+ public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
+ long size) throws IOException {
+ return lDirAlloc.getLocalPathForWrite(String.format(
+ Constants.REDUCE_INPUT_FILE_FORMAT_STRING,
+ getAttemptOutputDir().toString(), mapId.getId()),
+ size, conf);
+ }
+
+ /** Removes all of the files related to a task. */
+ public void removeAll() throws IOException {
+ throw new UnsupportedOperationException("Incompatible with LocalRunner");
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ if (conf instanceof JobConf) {
+ this.conf = (JobConf) conf;
+ } else {
+ this.conf = new JobConf(conf);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+}
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnTezChild.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnTezChild.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnTezChild.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnTezChild.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,459 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.task.impl;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.log4j.LogManager;
+import org.apache.tez.api.Task;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.mapreduce.hadoop.ContainerContext;
+import org.apache.tez.mapreduce.hadoop.ContainerTask;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.records.TezJobID;
+import org.apache.tez.records.TezTaskAttemptID;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+/**
+ * The main() for TEZ MapReduce task processes.
+ */
+public class YarnTezChild {
+
+ private static final Log LOG = LogFactory.getLog(YarnTezChild.class);
+
+ public static void main(String[] args) throws Throwable {
+ Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ LOG.debug("Child starting");
+
+ DeprecatedKeys.init();
+
+ final JobConf defaultConf = new JobConf();
+ defaultConf.addResource(MRJobConfig.JOB_CONF_FILE);
+ UserGroupInformation.setConfiguration(defaultConf);
+
+ String host = args[0];
+ int port = Integer.parseInt(args[1]);
+ final InetSocketAddress address =
+ NetUtils.createSocketAddrForHost(host, port);
+
+ final TezJobID jobID = IDConverter.fromMRJobId(JobID.forName(args[2]));
+
+ final MRTaskType taskType = MRTaskType.valueOf(args[3]);
+
+ final ContainerId containerId = ConverterUtils.toContainerId(args[4]);
+
+ // initialize metrics
+ DefaultMetricsSystem.initialize(
+ StringUtils.camelize(taskType.name() +"Task"));
+
+ // Security framework already loaded the tokens into current ugi
+ Credentials credentials =
+ UserGroupInformation.getCurrentUser().getCredentials();
+ LOG.info("Executing with tokens:");
+ for (Token<?> token: credentials.getAllTokens()) {
+ LOG.info(token);
+ }
+
+ // Create TaskUmbilicalProtocol as actual task owner.
+ UserGroupInformation taskOwner =
+ UserGroupInformation.createRemoteUser(jobID.toString());
+ Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
+ SecurityUtil.setTokenService(jt, address);
+ taskOwner.addToken(jt);
+ final TezTaskUmbilicalProtocol umbilical =
+ taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
+ @Override
+ public TezTaskUmbilicalProtocol run() throws Exception {
+ return (TezTaskUmbilicalProtocol)RPC.getProxy(TezTaskUmbilicalProtocol.class,
+ TezTaskUmbilicalProtocol.versionID, address, defaultConf);
+ }
+ });
+
+ // report non-pid to application master
+ String pid = System.getenv().get("JVM_PID");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PID, containerId: " + pid + ", " + containerId);
+ }
+ MRTaskContext taskContext = null;
+ ContainerTask containerTask = null;
+ UserGroupInformation childUGI = null;
+ TezTaskAttemptID taskAttemptId = null;
+ MRTask task = null;
+ ContainerContext containerContext = new ContainerContext(containerId, pid);
+
+ try {
+ while (true) {
+ // poll for new task
+ for (int idle = 0; null == containerTask; ++idle) {
+ long sleepTimeMilliSecs = Math.min(idle * 500, 1500);
+ LOG.info("Sleeping for " + sleepTimeMilliSecs
+ + "ms before retrying again. Got null now.");
+ MILLISECONDS.sleep(sleepTimeMilliSecs);
+ containerTask = umbilical.getTask(containerContext);
+ }
+ LOG.info("TaskInfo: shouldDie: "
+ + containerTask.shouldDie()
+ + (containerTask.shouldDie() == true ? "" : ", taskAttemptId: "
+ + containerTask.getMrTaskContext().getTaskAttemptId()));
+
+ if (containerTask.shouldDie()) {
+ return;
+ }
+ taskContext = containerTask.getMrTaskContext();
+
+ taskAttemptId = taskContext.getTaskAttemptId();
+
+ final Task t = createAndConfigureTezTask(taskContext, umbilical,
+ credentials, jt);
+ task = (MRTask) t.getProcessor();
+ final JobConf job = task.getConf();
+
+ // Initiate Java VM metrics
+ JvmMetrics.initSingleton(containerId.toString(), job.getSessionId());
+ childUGI = UserGroupInformation.createRemoteUser(System
+ .getenv(ApplicationConstants.Environment.USER.toString()));
+ // Add tokens to new user so that it may execute its task correctly.
+ childUGI.addCredentials(credentials);
+
+ childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ runTezTask(t, umbilical, job); // run the task
+ return null;
+ }
+ });
+ FileSystem.closeAllForUGI(childUGI);
+ containerTask = null;
+ }
+ } catch (FSError e) {
+ LOG.fatal("FSError from child", e);
+ umbilical.fsError(taskAttemptId, e.getMessage());
+ } catch (Exception exception) {
+ LOG.warn("Exception running child : "
+ + StringUtils.stringifyException(exception));
+ try {
+ if (task != null) {
+ // do cleanup for the task
+ if (childUGI == null) { // no need to job into doAs block
+ task.taskCleanup(umbilical);
+ } else {
+ final MRTask taskFinal = task;
+ childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ taskFinal.taskCleanup(umbilical);
+ return null;
+ }
+ });
+ }
+ }
+ } catch (Exception e) {
+ LOG.info("Exception cleaning up: " + StringUtils.stringifyException(e));
+ }
+ // Report back any failures, for diagnostic purposes
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ exception.printStackTrace(new PrintStream(baos));
+ if (taskAttemptId != null) {
+ umbilical.fatalError(taskAttemptId, baos.toString());
+ }
+ } catch (Throwable throwable) {
+ LOG.fatal("Error running child : "
+ + StringUtils.stringifyException(throwable));
+ if (taskAttemptId != null) {
+ Throwable tCause = throwable.getCause();
+ String cause = tCause == null
+ ? throwable.getMessage()
+ : StringUtils.stringifyException(tCause);
+ umbilical.fatalError(taskAttemptId, cause);
+ }
+ } finally {
+ RPC.stopProxy(umbilical);
+ DefaultMetricsSystem.shutdown();
+ // Shutting down log4j of the child-vm...
+ // This assumes that on return from Task.run()
+ // there is no more logging done.
+ LogManager.shutdown();
+ }
+ }
+
+ /**
+ * Configure mapred-local dirs. This config is used by the task for finding
+ * out an output directory.
+ * @throws IOException
+ */
+ private static void configureLocalDirs(MRTask task, JobConf job) throws IOException {
+ String[] localSysDirs = StringUtils.getTrimmedStrings(
+ System.getenv(ApplicationConstants.LOCAL_DIR_ENV));
+ job.setStrings(TezJobConfig.LOCAL_DIR, localSysDirs);
+ LOG.info(TezJobConfig.LOCAL_DIR + " for child: " +
+ job.get(TezJobConfig.LOCAL_DIR));
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
+ Path workDir = null;
+ // First, try to find the JOB_LOCAL_DIR on this host.
+ try {
+ workDir = lDirAlloc.getLocalPathToRead("work", job);
+ } catch (DiskErrorException e) {
+ // DiskErrorException means dir not found. If not found, it will
+ // be created below.
+ }
+ if (workDir == null) {
+ // JOB_LOCAL_DIR doesn't exist on this host -- Create it.
+ workDir = lDirAlloc.getLocalPathForWrite("work", job);
+ FileSystem lfs = FileSystem.getLocal(job).getRaw();
+ boolean madeDir = false;
+ try {
+ madeDir = lfs.mkdirs(workDir);
+ } catch (FileAlreadyExistsException e) {
+ // Since all tasks will be running in their own JVM, the race condition
+ // exists where multiple tasks could be trying to create this directory
+ // at the same time. If this task loses the race, it's okay because
+ // the directory already exists.
+ madeDir = true;
+ workDir = lDirAlloc.getLocalPathToRead("work", job);
+ }
+ if (!madeDir) {
+ throw new IOException("Mkdirs failed to create "
+ + workDir.toString());
+ }
+ }
+ job.set(MRJobConfig.JOB_LOCAL_DIR,workDir.toString());
+ }
+
+ private static JobConf configureTask(MRTask task, Credentials credentials,
+ Token<JobTokenIdentifier> jt) throws IOException, InterruptedException {
+ JobConf job = task.getConf();
+
+ String appAttemptIdEnv = System
+ .getenv(MRJobConfig.APPLICATION_ATTEMPT_ID_ENV);
+ LOG.debug("APPLICATION_ATTEMPT_ID: " + appAttemptIdEnv);
+ // Set it in conf, so as to be able to be used the the OutputCommitter.
+ job.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, Integer
+ .parseInt(appAttemptIdEnv));
+
+ // set tcp nodelay
+ job.setBoolean("ipc.client.tcpnodelay", true);
+ job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
+ YarnOutputFiles.class, MapOutputFile.class);
+ // set the jobTokenFile into task
+ SecretKey sk = JobTokenSecretManager.createSecretKey(jt.getPassword());
+
+ task.setJobTokenSecret(sk);
+// task.setJobTokenSecret(
+// JobTokenSecretManager.createSecretKey(jt.getPassword()));
+
+ // setup the child's MRConfig.LOCAL_DIR.
+ configureLocalDirs(task, job);
+
+ // setup the child's attempt directories
+ // Do the task-type specific localization
+ task.localizeConfiguration(job);
+
+ // Set up the DistributedCache related configs
+ setupDistributedCacheConfig(job);
+
+ // Overwrite the localized task jobconf which is linked to in the current
+ // work-dir.
+ Path localTaskFile = new Path(MRJobConfig.JOB_CONF_FILE);
+ writeLocalJobFile(localTaskFile, job);
+ task.setConf(job);
+ return job;
+ }
+
+ /**
+ * Set up the DistributedCache related configs to make
+ * {@link DistributedCache#getLocalCacheFiles(Configuration)}
+ * and
+ * {@link DistributedCache#getLocalCacheArchives(Configuration)}
+ * working.
+ * @param job
+ * @throws IOException
+ */
+ private static void setupDistributedCacheConfig(final JobConf job)
+ throws IOException {
+
+ String localWorkDir = System.getenv("PWD");
+ // ^ ^ all symlinks are created in the current work-dir
+
+ // Update the configuration object with localized archives.
+ URI[] cacheArchives = DistributedCache.getCacheArchives(job);
+ if (cacheArchives != null) {
+ List<String> localArchives = new ArrayList<String>();
+ for (int i = 0; i < cacheArchives.length; ++i) {
+ URI u = cacheArchives[i];
+ Path p = new Path(u);
+ Path name =
+ new Path((null == u.getFragment()) ? p.getName()
+ : u.getFragment());
+ String linkName = name.toUri().getPath();
+ localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
+ }
+ if (!localArchives.isEmpty()) {
+ job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
+ .arrayToString(localArchives.toArray(new String[localArchives
+ .size()])));
+ }
+ }
+
+ // Update the configuration object with localized files.
+ URI[] cacheFiles = DistributedCache.getCacheFiles(job);
+ if (cacheFiles != null) {
+ List<String> localFiles = new ArrayList<String>();
+ for (int i = 0; i < cacheFiles.length; ++i) {
+ URI u = cacheFiles[i];
+ Path p = new Path(u);
+ Path name =
+ new Path((null == u.getFragment()) ? p.getName()
+ : u.getFragment());
+ String linkName = name.toUri().getPath();
+ localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
+ }
+ if (!localFiles.isEmpty()) {
+ job.set(MRJobConfig.CACHE_LOCALFILES,
+ StringUtils.arrayToString(localFiles
+ .toArray(new String[localFiles.size()])));
+ }
+ }
+ }
+
+ private static final FsPermission urw_gr =
+ FsPermission.createImmutable((short) 0640);
+
+ /**
+ * Write the task specific job-configuration file.
+ * @throws IOException
+ */
+ private static void writeLocalJobFile(Path jobFile, JobConf conf)
+ throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(jobFile);
+ OutputStream out = null;
+ try {
+ out = FileSystem.create(localFs, jobFile, urw_gr);
+ conf.writeXml(out);
+ } finally {
+ IOUtils.cleanup(LOG, out);
+ }
+ }
+
+ private static Task createAndConfigureTezTask(
+ TezTask taskContext,
+ TezTaskUmbilicalProtocol master,
+ Credentials credentials, Token<JobTokenIdentifier> jt)
+ throws IOException, InterruptedException {
+ final JobConf job = new JobConf(MRJobConfig.JOB_CONF_FILE);
+ job.setCredentials(credentials);
+
+ // Create the appropriate guice task-module
+ AbstractModule taskModule = null;
+ LOG.info("Using Module: " + taskContext.getTaskModuleClassName());
+ try {
+ Class<?> moduleClazz = Class
+ .forName(taskContext.getTaskModuleClassName());
+ if (AbstractModule.class.isAssignableFrom(moduleClazz)) {
+ taskModule = (AbstractModule) ReflectionUtils.newInstance(moduleClazz,
+ job);
+ } else {
+ throw new YarnException("Module class: " + moduleClazz.getName()
+ + " should be an instance of "
+ + AbstractModule.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new YarnException("Unable to load moduleClass: "
+ + taskContext.getTaskModuleClassName(), e);
+ }
+
+ // Use the injector to create & bind input, processor, output & task
+ Injector injector = Guice.createInjector(taskModule);
+ TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+ Task t = factory.createTask(taskContext);
+ t.initialize(job, master);
+
+ MRTask task = (MRTask)t.getProcessor();
+ configureTask(task, credentials, jt);
+
+ return t;
+ }
+
+ private static void runTezTask(
+ Task t, TezTaskUmbilicalProtocol master, JobConf job)
+ throws IOException, InterruptedException {
+ // use job-specified working directory
+ FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
+
+ // Run!
+ t.run();
+ t.close();
+ }
+}
Added: incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java (added)
+++ incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.tez.api.Input;
+import org.apache.tez.api.Master;
+import org.apache.tez.api.Output;
+import org.apache.tez.api.Processor;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.mapreduce.task.FinalTask;
+import org.apache.tez.mapreduce.task.InitialTask;
+import org.apache.tez.mapreduce.task.IntermediateTask;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class TestTaskModules {
+
+ private static final Log LOG = LogFactory.getLog(TestTaskModules.class);
+
+ MRTaskContext taskContext;
+ JobConf job;
+
+ @Before
+ public void setUp() {
+ taskContext =
+ new MRTaskContext(
+ TezTestUtils.getMockTaskAttemptId(0, 0, 0, MRTaskType.REDUCE),
+ "tez", "tez", TestInitialModule.class.getName(), null, "", 0, 0);
+ job = new JobConf();
+ }
+
+ @Test
+ public void testInitialTask() throws Exception {
+ Injector injector = Guice.createInjector(new TestInitialModule());
+ TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+ Task t = factory.createTask(taskContext);
+ t.initialize(job, null);
+ }
+
+ @Test
+ public void testIntermediateTask() throws Exception {
+ Injector injector = Guice.createInjector(new TestIntermediateModule());
+ TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+ Task t = factory.createTask(taskContext);
+ t.initialize(job, null);
+ }
+
+ @Test
+ public void testFinalTask() throws Exception {
+ Injector injector = Guice.createInjector(new TestFinalModule());
+ TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+ Task task = factory.createTask(taskContext);
+ LOG.info("task = " + task.getClass());
+ task.initialize(job, null);
+ }
+
+ static class TestTask implements Task {
+
+ private final Input in;
+ private final Output out;
+ private final Processor processor;
+
+ @Inject
+ public TestTask(
+ @Assisted Processor processor,
+ @Assisted Input in,
+ @Assisted Output out) {
+ this.in = in;
+ this.processor = processor;
+ this.out = out;
+ }
+
+ @Override
+ public void initialize(Configuration conf, Master master)
+ throws IOException, InterruptedException {
+ LOG.info("in = " + in.getClass());
+ LOG.info("processor = " + processor.getClass());
+ LOG.info("out = " + out.getClass());
+ }
+
+ @Override
+ public Input getInput() {
+ return in;
+ }
+
+ @Override
+ public Output getOutput() {
+ return out;
+ }
+
+ @Override
+ public Processor getProcessor() {
+ return processor;
+ }
+
+ @Override
+ public void run() throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void close() throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ static class TestInitialModule extends InitialTask {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, SimpleInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, OnFileSortedOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, MapProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, TestTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+ }
+
+
+ static class TestIntermediateModule extends IntermediateTask {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, ShuffledMergedInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, OnFileSortedOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, ReduceProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, TestTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+ }
+
+
+ static class TestFinalModule extends FinalTask {
+
+ @Override
+ protected void configure() {
+ install(
+ new FactoryModuleBuilder().implement(
+ Output.class, SimpleOutput.class).
+ build(OutputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Input.class, ShuffledMergedInput.class).
+ build(InputFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Processor.class, ReduceProcessor.class).
+ build(ProcessorFactory.class)
+ );
+ install(
+ new FactoryModuleBuilder().implement(
+ Task.class, TestTask.class).
+ build(TaskFactory.class)
+ );
+
+ bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+ }
+
+ }
+
+}
Added: incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java (added)
+++ incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,150 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.mapreduce.hadoop.ContainerContext;
+import org.apache.tez.mapreduce.hadoop.ContainerTask;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+import org.apache.tez.records.TezJobID;
+import org.apache.tez.records.TezTaskAttemptID;
+import org.apache.tez.records.TezTaskDependencyCompletionEventsUpdate;
+import org.apache.tez.records.OutputContext;
+
+public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol {
+
+ private static final Log LOG = LogFactory.getLog(TestUmbilicalProtocol.class);
+ private ProceedToCompletionResponse proceedToCompletionResponse;
+ private boolean shouldLinger;
+
+ public TestUmbilicalProtocol() {
+ proceedToCompletionResponse = new ProceedToCompletionResponse(false, true);
+ }
+
+ public TestUmbilicalProtocol(boolean shouldLinger) {
+ if (shouldLinger) {
+ proceedToCompletionResponse = new ProceedToCompletionResponse(false, false);
+ } else {
+ proceedToCompletionResponse = new ProceedToCompletionResponse(false, true);
+ }
+ this.shouldLinger = shouldLinger;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
+ TezJobID jobID, int fromEventIdx, int maxEventsToFetch,
+ TezTaskAttemptID reduce) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ContainerTask getTask(ContainerContext containerContext)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean statusUpdate(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ LOG.info("Got 'status-update' from " + taskId + ": status=" + taskStatus);
+ return true;
+ }
+
+ @Override
+ public void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace)
+ throws IOException {
+ LOG.info("Got 'diagnostic-info' from " + taskid + ": trace=" + trace);
+ }
+
+ @Override
+ public boolean ping(TezTaskAttemptID taskid) throws IOException {
+ LOG.info("Got 'ping' from " + taskid);
+ return true;
+ }
+
+ @Override
+ public void done(TezTaskAttemptID taskid) throws IOException {
+ LOG.info("Got 'done' from " + taskid);
+ }
+
+ @Override
+ public void commitPending(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ LOG.info("Got 'commit-pending' from " + taskId + ": status=" + taskStatus);
+ }
+
+ @Override
+ public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+ LOG.info("Got 'can-commit' from " + taskid);
+ return !shouldLinger;
+ }
+
+ @Override
+ public void shuffleError(TezTaskAttemptID taskId, String message)
+ throws IOException {
+ LOG.info("Got 'shuffle-error' from " + taskId + ": message=" + message);
+ }
+
+ @Override
+ public void fsError(TezTaskAttemptID taskId, String message)
+ throws IOException {
+ LOG.info("Got 'fs-error' from " + taskId + ": message=" + message);
+ }
+
+ @Override
+ public void fatalError(TezTaskAttemptID taskId, String message)
+ throws IOException {
+ LOG.info("Got 'fatal-error' from " + taskId + ": message=" + message);
+ }
+
+ @Override
+ public void outputReady(TezTaskAttemptID taskAttemptId,
+ OutputContext outputContext) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public ProceedToCompletionResponse proceedToCompletion(
+ TezTaskAttemptID taskAttemptId) throws IOException {
+ return proceedToCompletionResponse;
+ }
+
+}
Added: incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java (added)
+++ incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce;
+
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.records.TezJobID;
+import org.apache.tez.records.TezTaskAttemptID;
+import org.apache.tez.records.TezTaskID;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TezTestUtils {
+
+ public static TezTaskAttemptID getMockTaskAttemptId(
+ int jobId, int taskId, int taskAttemptId, MRTaskType type) {
+ TezTaskAttemptID taskAttemptID = mock(TezTaskAttemptID.class);
+ TezTaskID taskID = getMockTaskId(jobId, taskId, type);
+ TezJobID jobID = taskID.getJobID();
+ when(taskAttemptID.getJobID()).thenReturn(jobID);
+ when(taskAttemptID.getTaskID()).thenReturn(taskID);
+ when(taskAttemptID.getId()).thenReturn(taskAttemptId);
+ when(taskAttemptID.getTaskType()).thenReturn(type.toString());
+ when(taskAttemptID.toString()).thenReturn(
+ "attempt_tez_" + Integer.toString(jobId) + "_" +
+ ((type == MRTaskType.MAP) ? "m" : "r") + "_" +
+ Integer.toString(taskId) + "_" + Integer.toString(taskAttemptId)
+ );
+ return taskAttemptID;
+ }
+
+ public static TezTaskID getMockTaskId(int jobId, int taskId, MRTaskType type) {
+ TezJobID jobID = getMockJobId(jobId);
+ TezTaskID taskID = mock(TezTaskID.class);
+ when(taskID.getTaskType()).thenReturn(type.toString());
+ when(taskID.getId()).thenReturn(taskId);
+ when(taskID.getJobID()).thenReturn(jobID);
+ return taskID;
+ }
+
+ public static TezJobID getMockJobId(int jobId) {
+ TezJobID jobID = mock(TezJobID.class);
+ when(jobID.getId()).thenReturn(jobId);
+ when(jobID.getJtIdentifier()).thenReturn("mock");
+ return jobID;
+ }
+}
Added: incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java (added)
+++ incubator/tez/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,117 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.processor;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.tez.api.Task;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.mapreduce.TezTestUtils;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.task.InitialTaskWithLocalSort;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class MapUtils {
+
+ private static final Log LOG = LogFactory.getLog(MapUtils.class);
+
+ private static InputSplit
+ createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file)
+ throws IOException {
+ FileInputFormat.setInputPaths(job, workDir);
+
+
+ // create a file with length entries
+ SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs, job, file,
+ LongWritable.class, Text.class);
+ try {
+ Random r = new Random(System.currentTimeMillis());
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ for (int i = 10; i > 0; i--) {
+ key.set(r.nextInt(1000));
+ value.set(Integer.toString(i));
+ writer.append(key, value);
+ LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
+ }
+ } finally {
+ writer.close();
+ }
+
+ SequenceFileInputFormat<LongWritable, Text> format =
+ new SequenceFileInputFormat<LongWritable, Text>();
+ InputSplit[] splits = format.getSplits(job, 1);
+ System.err.println("#split = " + splits.length + " ; " +
+ "#locs = " + splits[0].getLocations().length + "; " +
+ "loc = " + splits[0].getLocations()[0] + "; " +
+ "off = " + splits[0].getLength() + "; " +
+ "file = " + ((FileSplit)splits[0]).getPath());
+ return splits[0];
+ }
+
+ public static Task runMapProcessor(FileSystem fs, Path workDir,
+ JobConf jobConf,
+ int mapId, Path mapInput, AbstractModule taskModule,
+ TezTaskUmbilicalProtocol umbilical)
+ throws Exception {
+ jobConf.setInputFormat(SequenceFileInputFormat.class);
+ InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput);
+ MRTaskContext taskContext =
+ new MRTaskContext(
+ TezTestUtils.getMockTaskAttemptId(0, mapId, 0, MRTaskType.MAP),
+ "tez", "tez", InitialTaskWithLocalSort.class.getName(), null,
+ ((FileSplit)split).getPath().toString(), 0, 0);
+
+ Injector injector = Guice.createInjector(taskModule);
+ TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+ Task t = factory.createTask(taskContext);
+ t.initialize(jobConf, umbilical);
+ SimpleInput real = ((SimpleInput)t.getInput());
+ SimpleInput in = spy(real);
+ doReturn(split).when(in).getOldSplitDetails(any(TaskSplitIndex.class));
+ t.getProcessor().process(in, t.getOutput());
+ return t;
+ }
+
+}