You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/04/02 20:08:29 UTC

[2/5] flink git commit: Added support for Apache Tez as an execution environment

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
new file mode 100644
index 0000000..01dbbc5
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
+import org.apache.flink.runtime.operators.util.CloseableInputProvider;
+import org.apache.flink.tez.runtime.input.TezReaderIterator;
+import org.apache.flink.tez.util.DummyInvokable;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class DataSinkProcessor<IT> extends AbstractLogicalIOProcessor {
+
+	// Tez stuff
+	private TezTaskConfig config;
+	protected Map<String, LogicalInput> inputs;
+	private List<KeyValueReader> readers;
+	private int numInputs;
+	private TezRuntimeEnvironment runtimeEnvironment;
+	AbstractInvokable invokable = new DummyInvokable();
+
+	// Flink stuff
+	private OutputFormat<IT> format;
+	private ClassLoader userCodeClassLoader = this.getClass().getClassLoader();
+	private CloseableInputProvider<IT> localStrategy;
+	// input reader
+	private MutableObjectIterator<IT> reader;
+	// input iterator
+	private MutableObjectIterator<IT> input;
+	private TypeSerializerFactory<IT> inputTypeSerializerFactory;
+
+
+
+
+	public DataSinkProcessor(ProcessorContext context) {
+		super(context);
+	}
+
+	@Override
+	public void initialize() throws Exception {
+		UserPayload payload = getContext().getUserPayload();
+		Configuration conf = TezUtils.createConfFromUserPayload(payload);
+
+		this.config = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
+		config.setTaskName(getContext().getTaskVertexName());
+
+		this.runtimeEnvironment = new TezRuntimeEnvironment((long) (0.7 * this.getContext().getTotalMemoryAvailableToTask()));
+
+		this.inputTypeSerializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
+
+		initOutputFormat();
+	}
+
+	@Override
+	public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
+
+		Preconditions.checkArgument((outputs == null) || (outputs.size() == 0));
+		Preconditions.checkArgument(inputs.size() == 1);
+
+		this.inputs = inputs;
+		this.numInputs = inputs.size();
+		this.readers = new ArrayList<KeyValueReader>(numInputs);
+		if (this.inputs != null) {
+			for (LogicalInput input: this.inputs.values()) {
+				//if (input instanceof AbstractLogicalInput) {
+				//	((AbstractLogicalInput) input).initialize();
+				//}
+				input.start();
+				readers.add((KeyValueReader) input.getReader());
+			}
+		}
+
+		this.reader = new TezReaderIterator<IT>(readers.get(0));
+
+		this.invoke();
+	}
+
+	@Override
+	public void handleEvents(List<Event> processorEvents) {
+
+	}
+
+	@Override
+	public void close() throws Exception {
+		this.runtimeEnvironment.getIOManager().shutdown();
+	}
+
+	private void invoke () {
+		try {
+			// initialize local strategies
+			switch (this.config.getInputLocalStrategy(0)) {
+				case NONE:
+					// nothing to do
+					localStrategy = null;
+					input = reader;
+					break;
+				case SORT:
+					// initialize sort local strategy
+					try {
+						// get type comparator
+						TypeComparatorFactory<IT> compFact = this.config.getInputComparator(0, this.userCodeClassLoader);
+						if (compFact == null) {
+							throw new Exception("Missing comparator factory for local strategy on input " + 0);
+						}
+
+						// initialize sorter
+						UnilateralSortMerger<IT> sorter = new UnilateralSortMerger<IT>(
+								this.runtimeEnvironment.getMemoryManager(),
+								this.runtimeEnvironment.getIOManager(),
+								this.reader, this.invokable, this.inputTypeSerializerFactory, compFact.createComparator(),
+								this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0),
+								this.config.getSpillingThresholdInput(0));
+
+						this.localStrategy = sorter;
+						this.input = sorter.getIterator();
+					} catch (Exception e) {
+						throw new RuntimeException("Initializing the input processing failed" +
+								e.getMessage() == null ? "." : ": " + e.getMessage(), e);
+					}
+					break;
+				default:
+					throw new RuntimeException("Invalid local strategy for DataSinkTask");
+			}
+
+			final TypeSerializer<IT> serializer = this.inputTypeSerializerFactory.getSerializer();
+			final MutableObjectIterator<IT> input = this.input;
+			final OutputFormat<IT> format = this.format;
+
+
+			IT record = serializer.createInstance();
+			format.open (this.getContext().getTaskIndex(), this.getContext().getVertexParallelism());
+
+			// work!
+			while (((record = input.next(record)) != null)) {
+				format.writeRecord(record);
+			}
+
+			this.format.close();
+			this.format = null;
+		}
+		catch (IOException e) {
+			e.printStackTrace();
+			throw new RuntimeException();
+		}
+		finally {
+			if (this.format != null) {
+				// close format, if it has not been closed, yet.
+				// This should only be the case if we had a previous error, or were canceled.
+				try {
+					this.format.close();
+				}
+				catch (Throwable t) {
+					//TODO log warning message
+				}
+			}
+			// close local strategy if necessary
+			if (localStrategy != null) {
+				try {
+					this.localStrategy.close();
+				} catch (Throwable t) {
+					//TODO log warning message
+				}
+			}
+		}
+	}
+
+	private void initOutputFormat () {
+		try {
+			this.format = this.config.<OutputFormat<IT>>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(OutputFormat.class, this.userCodeClassLoader);
+
+			// check if the class is a subclass, if the check is required
+			if (!OutputFormat.class.isAssignableFrom(this.format.getClass())) {
+				throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" +
+						OutputFormat.class.getName() + "' as is required.");
+			}
+		}
+		catch (ClassCastException ccex) {
+			throw new RuntimeException("The stub class is not a proper subclass of " + OutputFormat.class.getName(), ccex);
+		}
+
+		// configure the stub. catch exceptions here extra, to report them as originating from the user code
+		try {
+			this.format.configure(this.config.getStubParameters());
+		}
+		catch (Throwable t) {
+			throw new RuntimeException("The user defined 'configure()' method in the Output Format caused an error: "
+					+ t.getMessage(), t);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java
new file mode 100644
index 0000000..dd3f843
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.tez.runtime.input.FlinkInput;
+import org.apache.flink.tez.runtime.output.TezChannelSelector;
+import org.apache.flink.tez.runtime.output.TezOutputCollector;
+import org.apache.flink.tez.runtime.output.TezOutputEmitter;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class DataSourceProcessor<OT> extends AbstractLogicalIOProcessor {
+
+	private TezTaskConfig config;
+	protected Map<String, LogicalOutput> outputs;
+	private List<KeyValueWriter> writers;
+	private int numOutputs;
+	private Collector<OT> collector;
+
+	private InputFormat<OT, InputSplit> format;
+	private TypeSerializerFactory<OT> serializerFactory;
+	private FlinkInput input;
+	private ClassLoader userCodeClassLoader = getClass().getClassLoader();
+
+
+	public DataSourceProcessor(ProcessorContext context) {
+		super(context);
+	}
+
+	@Override
+	public void initialize() throws Exception {
+		UserPayload payload = getContext().getUserPayload();
+		Configuration conf = TezUtils.createConfFromUserPayload(payload);
+
+		this.config = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
+		config.setTaskName(getContext().getTaskVertexName());
+
+		this.serializerFactory = config.getOutputSerializer(this.userCodeClassLoader);
+
+		initInputFormat();
+	}
+
+	@Override
+	public void handleEvents(List<Event> processorEvents) {
+		int i = 0;
+	}
+
+	@Override
+	public void close() throws Exception {
+
+	}
+
+	@Override
+	public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
+
+		Preconditions.checkArgument(inputs.size() == 1);
+		LogicalInput logicalInput = inputs.values().iterator().next();
+		if (!(logicalInput instanceof FlinkInput)) {
+			throw new RuntimeException("Input to Flink Data Source Processor should be of type FlinkInput");
+		}
+		this.input = (FlinkInput) logicalInput;
+		//this.reader = (KeyValueReader) input.getReader();
+
+		// Initialize inputs, get readers and writers
+		this.outputs = outputs;
+		this.numOutputs = outputs.size();
+		this.writers = new ArrayList<KeyValueWriter>(numOutputs);
+		if (this.outputs != null) {
+			for (LogicalOutput output : this.outputs.values()) {
+				output.start();
+				writers.add((KeyValueWriter) output.getWriter());
+			}
+		}
+		this.invoke();
+	}
+
+
+	private void invoke () {
+		final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
+		try {
+			InputSplit split = input.getSplit();
+
+			OT record = serializer.createInstance();
+			final InputFormat<OT, InputSplit> format = this.format;
+			format.open(split);
+
+			int numOutputs = outputs.size();
+			ArrayList<TezChannelSelector<OT>> channelSelectors = new ArrayList<TezChannelSelector<OT>>(numOutputs);
+			ArrayList<Integer> numStreamsInOutputs = this.config.getNumberSubtasksInOutput();
+			for (int i = 0; i < numOutputs; i++) {
+				final ShipStrategyType strategy = config.getOutputShipStrategy(i);
+				final TypeComparatorFactory<OT> compFactory = config.getOutputComparator(i, this.userCodeClassLoader);
+				final DataDistribution dataDist = config.getOutputDataDistribution(i, this.userCodeClassLoader);
+				if (compFactory == null) {
+					channelSelectors.add(i, new TezOutputEmitter<OT>(strategy));
+				} else if (dataDist == null){
+					final TypeComparator<OT> comparator = compFactory.createComparator();
+					channelSelectors.add(i, new TezOutputEmitter<OT>(strategy, comparator));
+				} else {
+					final TypeComparator<OT> comparator = compFactory.createComparator();
+					channelSelectors.add(i,new TezOutputEmitter<OT>(strategy, comparator, dataDist));
+				}
+			}
+			collector = new TezOutputCollector<OT>(writers, channelSelectors, serializerFactory.getSerializer(), numStreamsInOutputs);
+
+			while (!format.reachedEnd()) {
+				// build next pair and ship pair if it is valid
+				if ((record = format.nextRecord(record)) != null) {
+					collector.collect(record);
+				}
+			}
+			format.close();
+
+			collector.close();
+
+		}
+		catch (Exception ex) {
+			// close the input, but do not report any exceptions, since we already have another root cause
+			try {
+				this.format.close();
+			} catch (Throwable t) {}
+		}
+	}
+
+
+	private void initInputFormat() {
+		try {
+			this.format = config.<InputFormat<OT, InputSplit>>getStubWrapper(this.userCodeClassLoader)
+					.getUserCodeObject(InputFormat.class, this.userCodeClassLoader);
+
+			// check if the class is a subclass, if the check is required
+			if (!InputFormat.class.isAssignableFrom(this.format.getClass())) {
+				throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" +
+						InputFormat.class.getName() + "' as is required.");
+			}
+		}
+		catch (ClassCastException ccex) {
+			throw new RuntimeException("The stub class is not a proper subclass of " + InputFormat.class.getName(),
+					ccex);
+		}
+		// configure the stub. catch exceptions here extra, to report them as originating from the user code
+		try {
+			this.format.configure(this.config.getStubParameters());
+		}
+		catch (Throwable t) {
+			throw new RuntimeException("The user defined 'configure()' method caused an error: " + t.getMessage(), t);
+		}
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
new file mode 100644
index 0000000..6b39734
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.runtime.operators.PactDriver;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class RegularProcessor<S extends Function, OT> extends AbstractLogicalIOProcessor {
+
+	private TezTask<S,OT> task;
+	protected Map<String, LogicalInput> inputs;
+	protected Map<String, LogicalOutput> outputs;
+	private List<KeyValueReader> readers;
+	private List<KeyValueWriter> writers;
+	private int numInputs;
+	private int numOutputs;
+
+
+	public RegularProcessor(ProcessorContext context) {
+		super(context);
+	}
+
+	@Override
+	public void initialize() throws Exception {
+		UserPayload payload = getContext().getUserPayload();
+		Configuration conf = TezUtils.createConfFromUserPayload(payload);
+
+		TezTaskConfig taskConfig = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
+		taskConfig.setTaskName(getContext().getTaskVertexName());
+
+		RuntimeUDFContext runtimeUdfContext = new RuntimeUDFContext(getContext().getTaskVertexName(),
+				getContext().getVertexParallelism(),
+				getContext().getTaskIndex(),
+				getClass().getClassLoader(),
+				new ExecutionConfig());
+
+		this.task = new TezTask<S, OT>(taskConfig, runtimeUdfContext, this.getContext().getTotalMemoryAvailableToTask());
+	}
+
+	@Override
+	public void handleEvents(List<Event> processorEvents) {
+
+	}
+
+	@Override
+	public void close() throws Exception {
+		task.getIOManager().shutdown();
+	}
+
+	@Override
+	public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
+
+		this.inputs = inputs;
+		this.outputs = outputs;
+		final Class<? extends PactDriver<S, OT>> driverClass = this.task.getTaskConfig().getDriver();
+		PactDriver<S,OT> driver = InstantiationUtil.instantiate(driverClass, PactDriver.class);
+		this.numInputs = driver.getNumberOfInputs();
+		this.numOutputs = outputs.size();
+
+
+		this.readers = new ArrayList<KeyValueReader>(numInputs);
+		//Ensure size of list is = numInputs
+		for (int i = 0; i < numInputs; i++)
+			this.readers.add(null);
+		HashMap<String, ArrayList<Integer>> inputPositions = ((TezTaskConfig) this.task.getTaskConfig()).getInputPositions();
+		if (this.inputs != null) {
+			for (String name : this.inputs.keySet()) {
+				LogicalInput input = this.inputs.get(name);
+				//if (input instanceof AbstractLogicalInput) {
+				//	((AbstractLogicalInput) input).initialize();
+				//}
+				input.start();
+				ArrayList<Integer> positions = inputPositions.get(name);
+				for (Integer pos : positions) {
+					//int pos = inputPositions.get(name);
+					readers.set(pos, (KeyValueReader) input.getReader());
+				}
+			}
+		}
+
+		this.writers = new ArrayList<KeyValueWriter>(numOutputs);
+		if (this.outputs != null) {
+			for (LogicalOutput output : this.outputs.values()) {
+				output.start();
+				writers.add((KeyValueWriter) output.getWriter());
+			}
+		}
+
+		// Do the work
+		task.invoke (readers, writers);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java
new file mode 100644
index 0000000..39386e6
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+
+public class TezRuntimeEnvironment {
+
+	private static int DEFAULT_PAGE_SIZE = 32768;
+	private static int DEFAULT_NUM_SLOTS = 10;
+
+	private final IOManager ioManager;
+
+	private final MemoryManager memoryManager;
+
+	public TezRuntimeEnvironment(long totalMemory) {
+		int pageSize = DEFAULT_PAGE_SIZE;
+		int numSlots = DEFAULT_NUM_SLOTS;
+		this.memoryManager = new DefaultMemoryManager(totalMemory, numSlots, pageSize);
+		this.ioManager = new IOManagerAsync();
+	}
+
+	public IOManager getIOManager() {
+		return ioManager;
+	}
+
+	public MemoryManager getMemoryManager() {
+		return memoryManager;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
new file mode 100644
index 0000000..90df992
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.PactDriver;
+import org.apache.flink.runtime.operators.PactTaskContext;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
+import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.runtime.operators.util.CloseableInputProvider;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.tez.runtime.input.TezReaderIterator;
+import org.apache.flink.tez.runtime.output.TezChannelSelector;
+import org.apache.flink.tez.runtime.output.TezOutputEmitter;
+import org.apache.flink.tez.runtime.output.TezOutputCollector;
+import org.apache.flink.tez.util.DummyInvokable;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+import java.io.IOException;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+
+public class TezTask<S extends Function,OT>  implements PactTaskContext<S, OT> {
+
+	protected static final Log LOG = LogFactory.getLog(TezTask.class);
+
+	DummyInvokable invokable = new DummyInvokable();
+
+	/**
+	 * The driver that invokes the user code (the stub implementation). The central driver in this task
+	 * (further drivers may be chained behind this driver).
+	 */
+	protected volatile PactDriver<S, OT> driver;
+
+	/**
+	 * The instantiated user code of this task's main operator (driver). May be null if the operator has no udf.
+	 */
+	protected S stub;
+
+	/**
+	 * The udf's runtime context.
+	 */
+	protected RuntimeUDFContext runtimeUdfContext;
+
+	/**
+	 * The collector that forwards the user code's results. May forward to a channel or to chained drivers within
+	 * this task.
+	 */
+	protected Collector<OT> output;
+
+	/**
+	 * The inputs reader, wrapped in an iterator. Prior to the local strategies, etc...
+	 */
+	protected MutableObjectIterator<?>[] inputIterators;
+
+	/**
+	 * The local strategies that are applied on the inputs.
+	 */
+	protected volatile CloseableInputProvider<?>[] localStrategies;
+
+	/**
+	 * The inputs to the operator. Return the readers' data after the application of the local strategy
+	 * and the temp-table barrier.
+	 */
+	protected MutableObjectIterator<?>[] inputs;
+
+	/**
+	 * The serializers for the input data type.
+	 */
+	protected TypeSerializerFactory<?>[] inputSerializers;
+
+	/**
+	 * The comparators for the central driver.
+	 */
+	protected TypeComparator<?>[] inputComparators;
+
+	/**
+	 * The task configuration with the setup parameters.
+	 */
+	protected TezTaskConfig config;
+
+	/**
+	 * The class loader used to instantiate user code and user data types.
+	 */
+	protected ClassLoader userCodeClassLoader = ClassLoader.getSystemClassLoader();
+
+	/**
+	 * For now, create a default ExecutionConfig 
+	 */
+	protected ExecutionConfig executionConfig;
+
+	/*
+	 * Tez-specific variables given by the Processor
+	 */
+	protected TypeSerializer<OT> outSerializer;
+
+	protected List<Integer> numberOfSubTasksInOutputs;
+
+	protected String taskName;
+
+	protected int numberOfSubtasks;
+
+	protected int indexInSubtaskGroup;
+
+	TezRuntimeEnvironment runtimeEnvironment;
+
+	public TezTask(TezTaskConfig config, RuntimeUDFContext runtimeUdfContext, long availableMemory) {
+		this.config = config;
+		final Class<? extends PactDriver<S, OT>> driverClass = this.config.getDriver();
+		this.driver = InstantiationUtil.instantiate(driverClass, PactDriver.class);
+		
+		LOG.info("ClassLoader URLs: " + Arrays.toString(((URLClassLoader) this.userCodeClassLoader).getURLs()));
+		
+		this.stub = this.config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(Function.class, this.userCodeClassLoader); //TODO get superclass properly
+		this.runtimeUdfContext = runtimeUdfContext;
+		this.outSerializer = (TypeSerializer<OT>) this.config.getOutputSerializer(getClass().getClassLoader()).getSerializer();
+		this.numberOfSubTasksInOutputs = this.config.getNumberSubtasksInOutput();
+		this.taskName = this.config.getTaskName();
+		this.numberOfSubtasks = this.runtimeUdfContext.getNumberOfParallelSubtasks();
+		this.indexInSubtaskGroup = this.runtimeUdfContext.getIndexOfThisSubtask();
+		this.runtimeEnvironment = new TezRuntimeEnvironment((long) (0.7 * availableMemory));
+		this.executionConfig = runtimeUdfContext.getExecutionConfig();
+		this.invokable.setExecutionConfig(this.executionConfig);
+	}
+
+
+	//-------------------------------------------------------------
+	// Interface to FlinkProcessor
+	//-------------------------------------------------------------
+
+	public void invoke(List<KeyValueReader> readers, List<KeyValueWriter> writers) throws Exception {
+
+		// whatever happens in this scope, make sure that the local strategies are cleaned up!
+		// note that the initialization of the local strategies is in the try-finally block as well,
+		// so that the thread that creates them catches its own errors that may happen in that process.
+		// this is especially important, since there may be asynchronous closes (such as through canceling).
+		try {
+			// initialize the inputs and outputs
+			initInputsOutputs(readers, writers);
+
+			// pre main-function initialization
+			initialize();
+
+			// the work goes here
+			run();
+		}
+		finally {
+			// clean up in any case!
+			closeLocalStrategies();
+		}
+	}
+
+
+	/*
+	 * Initialize inputs, input serializers, input comparators, and collector
+	 * Assumes that the config and userCodeClassLoader has been set
+	 */
+	private void initInputsOutputs (List<KeyValueReader> readers, List<KeyValueWriter> writers) throws Exception {
+
+		int numInputs = readers.size();
+		Preconditions.checkArgument(numInputs == driver.getNumberOfInputs());
+
+		// Prior to local strategies
+		this.inputIterators = new MutableObjectIterator[numInputs];
+		//local strategies
+		this.localStrategies = new CloseableInputProvider[numInputs];
+		// After local strategies
+		this.inputs = new MutableObjectIterator[numInputs];
+
+		int numComparators = driver.getNumberOfDriverComparators();
+		initInputsSerializersAndComparators(numInputs, numComparators);
+
+		int index = 0;
+		for (KeyValueReader reader : readers) {
+			this.inputIterators[index] = new TezReaderIterator<Object>(reader);
+			initInputLocalStrategy(index);
+			index++;
+		}
+
+		int numOutputs = writers.size();
+		ArrayList<TezChannelSelector<OT>> channelSelectors = new ArrayList<TezChannelSelector<OT>>(numOutputs);
+		//ArrayList<Integer> numStreamsInOutputs = new ArrayList<Integer>(numOutputs);
+		for (int i = 0; i < numOutputs; i++) {
+			final ShipStrategyType strategy = config.getOutputShipStrategy(i);
+			final TypeComparatorFactory<OT> compFactory = config.getOutputComparator(i, this.userCodeClassLoader);
+			final DataDistribution dataDist = config.getOutputDataDistribution(i, this.userCodeClassLoader);
+			if (compFactory == null) {
+				channelSelectors.add(i, new TezOutputEmitter<OT>(strategy));
+			} else if (dataDist == null){
+				final TypeComparator<OT> comparator = compFactory.createComparator();
+				channelSelectors.add(i, new TezOutputEmitter<OT>(strategy, comparator));
+			} else {
+				final TypeComparator<OT> comparator = compFactory.createComparator();
+				channelSelectors.add(i,new TezOutputEmitter<OT>(strategy, comparator, dataDist));
+			}
+		}
+		this.output = new TezOutputCollector<OT>(writers, channelSelectors, outSerializer, numberOfSubTasksInOutputs);
+	}
+
+
+
+	// --------------------------------------------------------------------
+	// PactTaskContext interface
+	// --------------------------------------------------------------------
+
+	@Override
+	public TaskConfig getTaskConfig() {
+		return (TaskConfig) this.config;
+	}
+
+	@Override
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
+
+	@Override
+	public ClassLoader getUserCodeClassLoader() {
+		return this.userCodeClassLoader;
+	}
+
+	@Override
+	public MemoryManager getMemoryManager() {
+		return runtimeEnvironment.getMemoryManager();
+	}
+
+	@Override
+	public IOManager getIOManager() {
+		return runtimeEnvironment.getIOManager();
+	}
+
+	@Override
+	public <X> MutableObjectIterator<X> getInput(int index) {
+		if (index < 0 || index > this.driver.getNumberOfInputs()) {
+			throw new IndexOutOfBoundsException();
+		}
+		// check for lazy assignment from input strategies
+		if (this.inputs[index] != null) {
+			@SuppressWarnings("unchecked")
+			MutableObjectIterator<X> in = (MutableObjectIterator<X>) this.inputs[index];
+			return in;
+		} else {
+			final MutableObjectIterator<X> in;
+			try {
+				if (this.localStrategies[index] != null) {
+					@SuppressWarnings("unchecked")
+					MutableObjectIterator<X> iter = (MutableObjectIterator<X>) this.localStrategies[index].getIterator();
+					in = iter;
+				} else {
+					throw new RuntimeException("Bug: null input iterator, null temp barrier, and null local strategy.");
+				}
+				this.inputs[index] = in;
+				return in;
+			} catch (InterruptedException iex) {
+				throw new RuntimeException("Interrupted while waiting for input " + index + " to become available.");
+			} catch (IOException ioex) {
+				throw new RuntimeException("An I/O Exception occurred whily obaining input " + index + ".");
+			}
+		}
+	}
+
+	@Override
+	public <X> TypeSerializerFactory<X> getInputSerializer(int index) {
+		if (index < 0 || index >= this.driver.getNumberOfInputs()) {
+			throw new IndexOutOfBoundsException();
+		}
+
+		@SuppressWarnings("unchecked")
+		final TypeSerializerFactory<X> serializerFactory = (TypeSerializerFactory<X>) this.inputSerializers[index];
+		return serializerFactory;
+	}
+
+	@Override
+	public <X> TypeComparator<X> getDriverComparator(int index) {
+		if (this.inputComparators == null) {
+			throw new IllegalStateException("Comparators have not been created!");
+		}
+		else if (index < 0 || index >= this.driver.getNumberOfDriverComparators()) {
+			throw new IndexOutOfBoundsException();
+		}
+
+		@SuppressWarnings("unchecked")
+		final TypeComparator<X> comparator = (TypeComparator<X>) this.inputComparators[index];
+		return comparator;
+	}
+
+
+
+	@Override
+	public S getStub() {
+		return this.stub;
+	}
+
+	@Override
+	public Collector<OT> getOutputCollector() {
+		return this.output;
+	}
+
+	@Override
+	public AbstractInvokable getOwningNepheleTask() {
+		return this.invokable;
+	}
+
+	@Override
+	public String formatLogString(String message) {
+		return null;
+	}
+
+
+	// --------------------------------------------------------------------
+	// Adapted from RegularPactTask
+	// --------------------------------------------------------------------
+
+	private void initInputLocalStrategy(int inputNum) throws Exception {
+		// check if there is already a strategy
+		if (this.localStrategies[inputNum] != null) {
+			throw new IllegalStateException();
+		}
+
+		// now set up the local strategy
+		final LocalStrategy localStrategy = this.config.getInputLocalStrategy(inputNum);
+		if (localStrategy != null) {
+			switch (localStrategy) {
+				case NONE:
+					// the input is as it is
+					this.inputs[inputNum] = this.inputIterators[inputNum];
+					break;
+				case SORT:
+					@SuppressWarnings({ "rawtypes", "unchecked" })
+					UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(),
+							this.inputIterators[inputNum], this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
+							this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
+							this.config.getSpillingThresholdInput(inputNum));
+					// set the input to null such that it will be lazily fetched from the input strategy
+					this.inputs[inputNum] = null;
+					this.localStrategies[inputNum] = sorter;
+					break;
+				case COMBININGSORT:
+					// sanity check this special case!
+					// this still breaks a bit of the abstraction!
+					// we should have nested configurations for the local strategies to solve that
+					if (inputNum != 0) {
+						throw new IllegalStateException("Performing combining sort outside a (group)reduce task!");
+					}
+
+					// instantiate ourselves a combiner. we should not use the stub, because the sort and the
+					// subsequent (group)reduce would otherwise share it multi-threaded
+					final Class<S> userCodeFunctionType = this.driver.getStubType();
+					if (userCodeFunctionType == null) {
+						throw new IllegalStateException("Performing combining sort outside a reduce task!");
+					}
+					final S localStub;
+					try {
+						localStub = initStub(userCodeFunctionType);
+					} catch (Exception e) {
+						throw new RuntimeException("Initializing the user code and the configuration failed" +
+								e.getMessage() == null ? "." : ": " + e.getMessage(), e);
+					}
+
+					if (!(localStub instanceof GroupCombineFunction)) {
+						throw new IllegalStateException("Performing combining sort outside a reduce task!");
+					}
+
+					@SuppressWarnings({ "rawtypes", "unchecked" })
+					CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger(
+							(GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
+							this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
+							this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
+							this.config.getSpillingThresholdInput(inputNum));
+					cSorter.setUdfConfiguration(this.config.getStubParameters());
+
+					// set the input to null such that it will be lazily fetched from the input strategy
+					this.inputs[inputNum] = null;
+					this.localStrategies[inputNum] = cSorter;
+					break;
+				default:
+					throw new Exception("Unrecognized local strategy provided: " + localStrategy.name());
+			}
+		} else {
+			// no local strategy in the config
+			this.inputs[inputNum] = this.inputIterators[inputNum];
+		}
+	}
+
+	private <T> TypeComparator<T> getLocalStrategyComparator(int inputNum) throws Exception {
+		TypeComparatorFactory<T> compFact = this.config.getInputComparator(inputNum, this.userCodeClassLoader);
+		if (compFact == null) {
+			throw new Exception("Missing comparator factory for local strategy on input " + inputNum);
+		}
+		return compFact.createComparator();
+	}
+
+	protected S initStub(Class<? super S> stubSuperClass) throws Exception {
+		try {
+			S stub = config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(stubSuperClass, this.userCodeClassLoader);
+			// check if the class is a subclass, if the check is required
+			if (stubSuperClass != null && !stubSuperClass.isAssignableFrom(stub.getClass())) {
+				throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" +
+						stubSuperClass.getName() + "' as is required.");
+			}
+			FunctionUtils.setFunctionRuntimeContext(stub, this.runtimeUdfContext);
+			return stub;
+		}
+		catch (ClassCastException ccex) {
+			throw new Exception("The stub class is not a proper subclass of " + stubSuperClass.getName(), ccex);
+		}
+	}
+
+	/**
+	 * Creates all the serializers and comparators.
+	 */
+	protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception {
+		this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
+		this.inputComparators = numComparators > 0 ? new TypeComparator[numComparators] : null;
+		//this.inputComparators = this.driver.requiresComparatorOnInput() ? new TypeComparator[numInputs] : null;
+		this.inputIterators = new MutableObjectIterator[numInputs];
+
+		for (int i = 0; i < numInputs; i++) {
+			//  ---------------- create the serializer first ---------------------
+			final TypeSerializerFactory<?> serializerFactory = this.config.getInputSerializer(i, this.userCodeClassLoader);
+			this.inputSerializers[i] = serializerFactory;
+			// this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
+		}
+		//  ---------------- create the driver's comparators ---------------------
+		for (int i = 0; i < numComparators; i++) {
+			if (this.inputComparators != null) {
+				final TypeComparatorFactory<?> comparatorFactory = this.config.getDriverComparator(i, this.userCodeClassLoader);
+				this.inputComparators[i] = comparatorFactory.createComparator();
+			}
+		}
+	}
+
+	protected void initialize() throws Exception {
+		// create the operator
+		try {
+			this.driver.setup(this);
+		}
+		catch (Throwable t) {
+			throw new Exception("The driver setup for '" + //TODO put taks name here
+					"' , caused an error: " + t.getMessage(), t);
+		}
+
+		//this.runtimeUdfContext = createRuntimeContext();
+
+		// instantiate the UDF
+		try {
+			final Class<? super S> userCodeFunctionType = this.driver.getStubType();
+			// if the class is null, the driver has no user code
+			if (userCodeFunctionType != null) {
+				this.stub = initStub(userCodeFunctionType);
+			}
+		} catch (Exception e) {
+			throw new RuntimeException("Initializing the UDF" +
+					e.getMessage() == null ? "." : ": " + e.getMessage(), e);
+		}
+	}
+
+	/*
+	public RuntimeUDFContext createRuntimeContext() {
+		return new RuntimeUDFContext(this.taskName, this.numberOfSubtasks, this.indexInSubtaskGroup, null);
+	}
+	*/
+
+	protected void closeLocalStrategies() {
+		if (this.localStrategies != null) {
+			for (int i = 0; i < this.localStrategies.length; i++) {
+				if (this.localStrategies[i] != null) {
+					try {
+						this.localStrategies[i].close();
+					} catch (Throwable t) {
+						LOG.error("Error closing local strategy for input " + i, t);
+					}
+				}
+			}
+		}
+	}
+
+	protected void run() throws Exception {
+		// ---------------------------- Now, the actual processing starts ------------------------
+		// check for asynchronous canceling
+
+		boolean stubOpen = false;
+
+		try {
+			// run the data preparation
+			try {
+				this.driver.prepare();
+			}
+			catch (Throwable t) {
+				// if the preparation caused an error, clean up
+				// errors during clean-up are swallowed, because we have already a root exception
+				throw new Exception("The data preparation for task '" + this.taskName +
+						"' , caused an error: " + t.getMessage(), t);
+			}
+
+			// open stub implementation
+			if (this.stub != null) {
+				try {
+					Configuration stubConfig = this.config.getStubParameters();
+					FunctionUtils.openFunction(this.stub, stubConfig);
+					stubOpen = true;
+				}
+				catch (Throwable t) {
+					throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t);
+				}
+			}
+
+			// run the user code
+			this.driver.run();
+
+			// close. We close here such that a regular close throwing an exception marks a task as failed.
+			if (this.stub != null) {
+				FunctionUtils.closeFunction(this.stub);
+				stubOpen = false;
+			}
+
+			this.output.close();
+
+		}
+		catch (Exception ex) {
+			// close the input, but do not report any exceptions, since we already have another root cause
+			ex.printStackTrace();
+			throw new RuntimeException("Exception in TaskContext: " + ex.getMessage() + " "+  ex.getStackTrace());
+		}
+		finally {
+			this.driver.cleanup();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
new file mode 100644
index 0000000..94a8315
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+public class TezTaskConfig extends TaskConfig {
+
+	public static final String TEZ_TASK_CONFIG = "tez.task.flink.processor.taskconfig";
+
+	private static final String NUMBER_SUBTASKS_IN_OUTPUTS = "tez.num_subtasks_in_output";
+
+	private static final String INPUT_SPLIT_PROVIDER = "tez.input_split_provider";
+
+	private static final String INPUT_POSITIONS = "tez.input_positions";
+
+	private static final String INPUT_FORMAT = "tez.input_format";
+
+	private static final String DATASOURCE_PROCESSOR_NAME = "tez.datasource_processor_name";
+
+	public TezTaskConfig(Configuration config) {
+		super(config);
+	}
+
+
+	public void setDatasourceProcessorName(String name) {
+		if (name != null) {
+			this.config.setString(DATASOURCE_PROCESSOR_NAME, name);
+		}
+	}
+
+	public String getDatasourceProcessorName() {
+		return this.config.getString(DATASOURCE_PROCESSOR_NAME, null);
+	}
+
+	public void setNumberSubtasksInOutput(ArrayList<Integer> numberSubtasksInOutputs) {
+		try {
+			InstantiationUtil.writeObjectToConfig(numberSubtasksInOutputs, this.config, NUMBER_SUBTASKS_IN_OUTPUTS);
+		} catch (IOException e) {
+			throw new RuntimeException("Error while writing the input split provider object to the task configuration.");
+		}
+	}
+
+	public ArrayList<Integer> getNumberSubtasksInOutput() {
+		ArrayList<Integer> numberOfSubTasksInOutputs = null;
+		try {
+			numberOfSubTasksInOutputs = (ArrayList) InstantiationUtil.readObjectFromConfig(this.config, NUMBER_SUBTASKS_IN_OUTPUTS, getClass().getClassLoader());
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Error while reading the number of subtasks in outputs object from the task configuration.");
+		} catch (ClassNotFoundException e) {
+			throw new RuntimeException("Error while reading the number of subtasks in outpurs object from the task configuration. " +
+					"class not found.");
+		}
+		if (numberOfSubTasksInOutputs == null) {
+			throw new NullPointerException();
+		}
+		return numberOfSubTasksInOutputs;
+
+	}
+
+
+	public void setInputSplitProvider (InputSplitProvider inputSplitProvider) {
+		try {
+			InstantiationUtil.writeObjectToConfig(inputSplitProvider, this.config, INPUT_SPLIT_PROVIDER);
+		} catch (IOException e) {
+			throw new RuntimeException("Error while writing the input split provider object to the task configuration.");
+		}
+	}
+
+	public InputSplitProvider getInputSplitProvider () {
+		InputSplitProvider inputSplitProvider = null;
+		try {
+			inputSplitProvider = (InputSplitProvider) InstantiationUtil.readObjectFromConfig(this.config, INPUT_SPLIT_PROVIDER, getClass().getClassLoader());
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Error while reading the input split provider object from the task configuration.");
+		} catch (ClassNotFoundException e) {
+			throw new RuntimeException("Error while reading the input split provider object from the task configuration. " +
+					"ChannelSelector class not found.");
+		}
+		if (inputSplitProvider == null) {
+			throw new NullPointerException();
+		}
+		return inputSplitProvider;
+	}
+
+
+	public void setInputPositions(HashMap<String,ArrayList<Integer>> inputPositions) {
+		try {
+			InstantiationUtil.writeObjectToConfig(inputPositions, this.config, INPUT_POSITIONS);
+		} catch (IOException e) {
+			throw new RuntimeException("Error while writing the input positions object to the task configuration.");
+		}
+	}
+
+	public HashMap<String,ArrayList<Integer>> getInputPositions () {
+		HashMap<String,ArrayList<Integer>> inputPositions = null;
+		try {
+			inputPositions = (HashMap) InstantiationUtil.readObjectFromConfig(this.config, INPUT_POSITIONS, getClass().getClassLoader());
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Error while reading the input positions object from the task configuration.");
+		} catch (ClassNotFoundException e) {
+			throw new RuntimeException("Error while reading the input positions object from the task configuration. " +
+					"ChannelSelector class not found.");
+		}
+		if (inputPositions == null) {
+			throw new NullPointerException();
+		}
+		return inputPositions;
+	}
+
+	public void setInputFormat (InputFormat inputFormat) {
+		try {
+			InstantiationUtil.writeObjectToConfig(inputFormat, this.config, INPUT_FORMAT);
+		} catch (IOException e) {
+			throw new RuntimeException("Error while writing the input format object to the task configuration.");
+		}
+	}
+
+	public InputFormat getInputFormat () {
+		InputFormat inputFormat = null;
+		try {
+			inputFormat = (InputFormat) InstantiationUtil.readObjectFromConfig(this.config, INPUT_FORMAT, getClass().getClassLoader());
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Error while reading the input split provider object from the task configuration.");
+		} catch (ClassNotFoundException e) {
+			throw new RuntimeException("Error while reading the input split provider object from the task configuration. " +
+					"ChannelSelector class not found.");
+		}
+		if (inputFormat == null) {
+			throw new NullPointerException();
+		}
+		return inputFormat;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
new file mode 100644
index 0000000..7ceeac8
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class UnionProcessor extends AbstractLogicalIOProcessor {
+
+	private TezTaskConfig config;
+	protected Map<String, LogicalInput> inputs;
+	protected Map<String, LogicalOutput> outputs;
+	private List<KeyValueReader> readers;
+	private List<KeyValueWriter> writers;
+	private int numInputs;
+	private int numOutputs;
+
+	public UnionProcessor(ProcessorContext context) {
+		super(context);
+	}
+
+	@Override
+	public void initialize() throws Exception {
+		UserPayload payload = getContext().getUserPayload();
+		Configuration conf = TezUtils.createConfFromUserPayload(payload);
+
+		this.config = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
+		config.setTaskName(getContext().getTaskVertexName());
+	}
+
+	@Override
+	public void handleEvents(List<Event> processorEvents) {
+
+	}
+
+	@Override
+	public void close() throws Exception {
+
+	}
+
+	@Override
+	public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
+		this.inputs = inputs;
+		this.outputs = outputs;
+		this.numInputs = inputs.size();
+		this.numOutputs = outputs.size();
+
+		this.readers = new ArrayList<KeyValueReader>(numInputs);
+		if (this.inputs != null) {
+			for (LogicalInput input: this.inputs.values()) {
+				input.start();
+				readers.add((KeyValueReader) input.getReader());
+			}
+		}
+
+		this.writers = new ArrayList<KeyValueWriter>(numOutputs);
+		if (this.outputs != null) {
+			for (LogicalOutput output : this.outputs.values()) {
+				output.start();
+				writers.add((KeyValueWriter) output.getWriter());
+			}
+		}
+
+		Preconditions.checkArgument(writers.size() == 1);
+		KeyValueWriter writer = writers.get(0);
+
+		for (KeyValueReader reader: this.readers) {
+			while (reader.next()) {
+				Object key = reader.getCurrentKey();
+				Object value = reader.getCurrentValue();
+				writer.write(key, value);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
new file mode 100644
index 0000000..ef59fd0
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.input;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+public class FlinkInput extends AbstractLogicalInput {
+
+	private static final Log LOG = LogFactory.getLog(FlinkInput.class);
+	
+	private InputSplit split;
+	private boolean splitIsCreated;
+	private final ReentrantLock rrLock = new ReentrantLock();
+	private final Condition rrInited = rrLock.newCondition();
+
+	public FlinkInput(InputContext inputContext, int numPhysicalInputs) {
+		super(inputContext, numPhysicalInputs);
+		getContext().requestInitialMemory(0l, null); // mandatory call
+		split = null;
+	}
+
+	@Override
+	public void handleEvents(List<Event> inputEvents) throws Exception {
+		
+		LOG.info("Received " + inputEvents.size() + " events (should be = 1)");
+		
+		Event event = inputEvents.iterator().next();
+		
+		Preconditions.checkArgument(event instanceof InputDataInformationEvent,
+				getClass().getSimpleName()
+						+ " can only handle a single event of type: "
+						+ InputDataInformationEvent.class.getSimpleName());
+
+		initSplitFromEvent ((InputDataInformationEvent)event);
+	}
+
+	private void initSplitFromEvent (InputDataInformationEvent e) throws Exception {
+		rrLock.lock();
+
+		try {
+			ByteString byteString = ByteString.copyFrom(e.getUserPayload());
+			this.split =  (InputSplit) InstantiationUtil.deserializeObject(byteString.toByteArray(), getClass().getClassLoader());
+			this.splitIsCreated = true;
+			
+			LOG.info ("Initializing input split " + split.getSplitNumber() + ": " + split.toString() + " from event (" + e.getSourceIndex() + "," + e.getTargetIndex() + "): " + e.toString());
+			
+			rrInited.signal();
+		}
+		catch (Exception ex) {
+			throw new IOException(
+					"Interrupted waiting for InputSplit initialization");
+		}
+		finally {
+			rrLock.unlock();
+		}
+	}
+
+	@Override
+	public List<Event> close() throws Exception {
+		return null;
+	}
+
+	@Override
+	public void start() throws Exception {
+	}
+
+	@Override
+	public Reader getReader() throws Exception {
+		throw new RuntimeException("FlinkInput does not contain a Reader. Should use getSplit instead");
+	}
+
+	@Override
+	public List<Event> initialize() throws Exception {
+		return null;
+	}
+
+	public InputSplit getSplit () throws Exception {
+
+		rrLock.lock();
+		try {
+			if (!splitIsCreated) {
+				checkAndAwaitSplitInitialization();
+			}
+		}
+		finally {
+			rrLock.unlock();
+		}
+		if (split == null) {
+			LOG.info("Input split has not been created. This should not happen");
+			throw new RuntimeException("Input split has not been created. This should not happen");
+		}
+		return split;
+	}
+
+	void checkAndAwaitSplitInitialization() throws IOException {
+		assert rrLock.getHoldCount() == 1;
+		rrLock.lock();
+		try {
+			rrInited.await();
+		} catch (Exception e) {
+			throw new IOException(
+					"Interrupted waiting for InputSplit initialization");
+		} finally {
+			rrLock.unlock();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
new file mode 100644
index 0000000..db1261c
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.input;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+public class FlinkInputSplitGenerator extends InputInitializer {
+
+	private static final Log LOG = LogFactory.getLog(FlinkInputSplitGenerator.class);
+
+	InputFormat format;
+
+	public FlinkInputSplitGenerator(InputInitializerContext initializerContext) {
+		super(initializerContext);
+	}
+
+	@Override
+	public List<Event> initialize() throws Exception {
+
+		Configuration tezConf = TezUtils.createConfFromUserPayload(this.getContext().getUserPayload());
+
+		TezTaskConfig taskConfig = (TezTaskConfig) EncodingUtils.decodeObjectFromString(tezConf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
+
+		this.format = taskConfig.getInputFormat();
+
+		int numTasks = this.getContext().getNumTasks();
+
+		LOG.info ("Creating splits for " + numTasks + " tasks for input format " + format);
+		
+		InputSplit[] splits = format.createInputSplits((numTasks > 0) ? numTasks : 1 );
+
+		LOG.info ("Created " + splits.length + " input splits" + " tasks for input format " + format);
+		
+		//LOG.info ("Created + " + splits.length + " input splits for input format " + format);
+
+		LOG.info ("Sending input split events");
+		LinkedList<Event> events = new LinkedList<Event>();
+		for (int i = 0; i < splits.length; i++) {
+			byte [] bytes = InstantiationUtil.serializeObject(splits[i]);
+			ByteBuffer buf = ByteBuffer.wrap(bytes);
+			InputDataInformationEvent event = InputDataInformationEvent.createWithSerializedPayload(i % numTasks, buf);
+			event.setTargetIndex(i % numTasks);
+			events.add(event);
+			LOG.info ("Added event of index " + i + ": " + event);
+		}
+		return events;
+	}
+
+	@Override
+	public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
+
+	}
+
+	@Override
+	public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+		//super.onVertexStateUpdated(stateUpdate);
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
new file mode 100644
index 0000000..722f0a1
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.input;
+
+
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+import java.io.IOException;
+
+public class TezReaderIterator<T> implements MutableObjectIterator<T>{
+
+	private KeyValueReader kvReader;
+
+	public TezReaderIterator(KeyValueReader kvReader) {
+		this.kvReader = kvReader;
+	}
+
+	@Override
+	public T next(T reuse) throws IOException {
+		if (kvReader.next()) {
+			Object key = kvReader.getCurrentKey();
+			Object value = kvReader.getCurrentValue();
+			if (!(key instanceof IntWritable)) {
+				throw new IllegalStateException("Wrong key type");
+			}
+			reuse = (T) value;
+			return reuse;
+		}
+		else {
+			return null;
+		}
+	}
+
+	@Override
+	public T next() throws IOException {
+		if (kvReader.next()) {
+			Object key = kvReader.getCurrentKey();
+			Object value = kvReader.getCurrentValue();
+			if (!(key instanceof IntWritable)) {
+				throw new IllegalStateException("Wrong key type");
+			}
+			return (T) value;
+		}
+		else {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
new file mode 100644
index 0000000..2358f29
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.output;
+
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.tez.runtime.library.api.Partitioner;
+
+public class SimplePartitioner implements Partitioner {
+
+	@Override
+	public int getPartition(Object key, Object value, int numPartitions) {
+		if (!(key instanceof IntWritable)) {
+			throw new IllegalStateException("Partitioning key should be int");
+		}
+		IntWritable channel = (IntWritable) key;
+		return channel.get();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
new file mode 100644
index 0000000..7e5cd55
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.output;
+
+import java.io.Serializable;
+
+public interface TezChannelSelector<T> extends Serializable {
+
+	/**
+	 * Called to determine to which attached {@link org.apache.flink.runtime.io.network.channels.OutputChannel} objects the given record shall be forwarded.
+	 *
+	 * @param record
+	 *        the record to the determine the output channels for
+	 * @param numberOfOutputChannels
+	 *        the total number of output channels which are attached to respective output gate
+	 * @return a (possibly empty) array of integer numbers which indicate the indices of the output channels through
+	 *         which the record shall be forwarded
+	 */
+	int[] selectChannels(T record, int numberOfOutputChannels);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
new file mode 100644
index 0000000..b68e6c8
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.output;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+import java.io.IOException;
+import java.util.List;
+
+public class TezOutputCollector<T> implements Collector<T> {
+
+	private List<KeyValueWriter> writers;
+
+	private List<TezChannelSelector<T>> outputEmitters;
+
+	private List<Integer> numberOfStreamsInOutputs;
+
+	private int numOutputs;
+
+	private TypeSerializer<T> serializer;
+
+	public TezOutputCollector(List<KeyValueWriter> writers, List<TezChannelSelector<T>> outputEmitters, TypeSerializer<T> serializer, List<Integer> numberOfStreamsInOutputs) {
+		this.writers = writers;
+		this.outputEmitters = outputEmitters;
+		this.numberOfStreamsInOutputs = numberOfStreamsInOutputs;
+		this.serializer = serializer;
+		this.numOutputs = writers.size();
+	}
+
+	@Override
+	public void collect(T record) {
+		for (int i = 0; i < numOutputs; i++) {
+			KeyValueWriter writer = writers.get(i);
+			TezChannelSelector<T> outputEmitter = outputEmitters.get(i);
+			int numberOfStreamsInOutput = numberOfStreamsInOutputs.get(i);
+			try {
+				for (int channel : outputEmitter.selectChannels(record, numberOfStreamsInOutput)) {
+					IntWritable key = new IntWritable(channel);
+					writer.write(key, record);
+				}
+			}
+			catch (IOException e) {
+				throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e);
+			}
+		}
+	}
+
+	@Override
+	public void close() {
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
new file mode 100644
index 0000000..6dcee0b
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.output;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+public class TezOutputEmitter<T> implements TezChannelSelector<T> {
+
+	private final ShipStrategyType strategy;		// the shipping strategy used by this output emitter
+
+	private int[] channels;						// the reused array defining target channels
+
+	private int nextChannelToSendTo = 0;		// counter to go over channels round robin
+
+	private final TypeComparator<T> comparator;	// the comparator for hashing / sorting
+
+	// ------------------------------------------------------------------------
+	// Constructors
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new channel selector that distributes data round robin.
+	 */
+	public TezOutputEmitter() {
+		this(ShipStrategyType.NONE);
+	}
+
+	/**
+	 * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...).
+	 *
+	 * @param strategy The distribution strategy to be used.
+	 */
+	public TezOutputEmitter(ShipStrategyType strategy) {
+		this(strategy, null);
+	}
+
+	/**
+	 * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...)
+	 * and uses the supplied comparator to hash / compare records for partitioning them deterministically.
+	 *
+	 * @param strategy The distribution strategy to be used.
+	 * @param comparator The comparator used to hash / compare the records.
+	 */
+	public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator) {
+		this(strategy, comparator, null);
+	}
+
+	/**
+	 * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...)
+	 * and uses the supplied comparator to hash / compare records for partitioning them deterministically.
+	 *
+	 * @param strategy The distribution strategy to be used.
+	 * @param comparator The comparator used to hash / compare the records.
+	 * @param distr The distribution pattern used in the case of a range partitioning.
+	 */
+	public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator, DataDistribution distr) {
+		if (strategy == null) {
+			throw new NullPointerException();
+		}
+
+		this.strategy = strategy;
+		this.comparator = comparator;
+
+		switch (strategy) {
+			case FORWARD:
+			case PARTITION_HASH:
+			case PARTITION_RANGE:
+			case PARTITION_RANDOM:
+			case PARTITION_FORCED_REBALANCE:
+			case BROADCAST:
+				break;
+			default:
+				throw new IllegalArgumentException("Invalid shipping strategy for OutputEmitter: " + strategy.name());
+		}
+
+		if ((strategy == ShipStrategyType.PARTITION_RANGE) && distr == null) {
+			throw new NullPointerException("Data distribution must not be null when the ship strategy is range partitioning.");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Channel Selection
+	// ------------------------------------------------------------------------
+
+	@Override
+	public final int[] selectChannels(T record, int numberOfChannels) {
+		switch (strategy) {
+			case FORWARD:
+			case PARTITION_RANDOM:
+			case PARTITION_FORCED_REBALANCE:
+				return robin(numberOfChannels);
+			case PARTITION_HASH:
+				return hashPartitionDefault(record, numberOfChannels);
+			case PARTITION_RANGE:
+				return rangePartition(record, numberOfChannels);
+			case BROADCAST:
+				return broadcast(numberOfChannels);
+			default:
+				throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name());
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private final int[] robin(int numberOfChannels) {
+		if (this.channels == null || this.channels.length != 1) {
+			this.channels = new int[1];
+		}
+
+		int nextChannel = nextChannelToSendTo + 1;
+		nextChannel = nextChannel < numberOfChannels ? nextChannel : 0;
+
+		this.nextChannelToSendTo = nextChannel;
+		this.channels[0] = nextChannel;
+		return this.channels;
+	}
+
+	private final int[] broadcast(int numberOfChannels) {
+		if (channels == null || channels.length != numberOfChannels) {
+			channels = new int[numberOfChannels];
+			for (int i = 0; i < numberOfChannels; i++) {
+				channels[i] = i;
+			}
+		}
+
+		return channels;
+	}
+
+	private final int[] hashPartitionDefault(T record, int numberOfChannels) {
+		if (channels == null || channels.length != 1) {
+			channels = new int[1];
+		}
+
+		int hash = this.comparator.hash(record);
+
+		hash = murmurHash(hash);
+
+		if (hash >= 0) {
+			this.channels[0] = hash % numberOfChannels;
+		}
+		else if (hash != Integer.MIN_VALUE) {
+			this.channels[0] = -hash % numberOfChannels;
+		}
+		else {
+			this.channels[0] = 0;
+		}
+
+		return this.channels;
+	}
+
+	private final int murmurHash(int k) {
+		k *= 0xcc9e2d51;
+		k = Integer.rotateLeft(k, 15);
+		k *= 0x1b873593;
+
+		k = Integer.rotateLeft(k, 13);
+		k *= 0xe6546b64;
+
+		k ^= 4;
+		k ^= k >>> 16;
+		k *= 0x85ebca6b;
+		k ^= k >>> 13;
+		k *= 0xc2b2ae35;
+		k ^= k >>> 16;
+
+		return k;
+	}
+
+	private final int[] rangePartition(T record, int numberOfChannels) {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
new file mode 100644
index 0000000..39d247c
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.util;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+public class DummyInvokable extends AbstractInvokable {
+
+	private ExecutionConfig executionConfig;
+
+	public DummyInvokable() {
+	}
+
+	public DummyInvokable(ExecutionConfig executionConfig) {
+		this.executionConfig = executionConfig;
+	}
+
+	public void setExecutionConfig(ExecutionConfig executionConfig) {
+		this.executionConfig = executionConfig;
+	}
+
+	@Override
+	public void registerInputOutput() {}
+
+
+	@Override
+	public void invoke() throws Exception {}
+
+	@Override
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
new file mode 100644
index 0000000..202cb24
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.util;
+
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.commons.codec.binary.Base64;
+
+import java.io.IOException;
+
+public class EncodingUtils {
+
+	public static Object decodeObjectFromString(String encoded, ClassLoader cl) {
+
+		try {
+			if (encoded == null) {
+				return null;
+			}
+			byte[] bytes = Base64.decodeBase64(encoded);
+
+			return InstantiationUtil.deserializeObject(bytes, cl);
+		}
+		catch (IOException e) {
+			e.printStackTrace();
+			System.exit(-1);
+			throw new RuntimeException();
+		}
+		catch (ClassNotFoundException e) {
+			e.printStackTrace();
+			System.exit(-1);
+			throw new RuntimeException();
+		}
+	}
+
+	public static String encodeObjectToString(Object o) {
+
+		try {
+			byte[] bytes = InstantiationUtil.serializeObject(o);
+
+			String encoded = Base64.encodeBase64String(bytes);
+			return encoded;
+		}
+		catch (IOException e) {
+			e.printStackTrace();
+			System.exit(-1);
+			throw new RuntimeException();
+		}
+	}
+}