You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/04/02 20:08:29 UTC
[2/5] flink git commit: Added support for Apache Tez as an execution
environment
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
new file mode 100644
index 0000000..01dbbc5
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
+import org.apache.flink.runtime.operators.util.CloseableInputProvider;
+import org.apache.flink.tez.runtime.input.TezReaderIterator;
+import org.apache.flink.tez.util.DummyInvokable;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class DataSinkProcessor<IT> extends AbstractLogicalIOProcessor {
+
+ // Tez stuff
+ private TezTaskConfig config;
+ protected Map<String, LogicalInput> inputs;
+ private List<KeyValueReader> readers;
+ private int numInputs;
+ private TezRuntimeEnvironment runtimeEnvironment;
+ AbstractInvokable invokable = new DummyInvokable();
+
+ // Flink stuff
+ private OutputFormat<IT> format;
+ private ClassLoader userCodeClassLoader = this.getClass().getClassLoader();
+ private CloseableInputProvider<IT> localStrategy;
+ // input reader
+ private MutableObjectIterator<IT> reader;
+ // input iterator
+ private MutableObjectIterator<IT> input;
+ private TypeSerializerFactory<IT> inputTypeSerializerFactory;
+
+
+
+
+ public DataSinkProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ UserPayload payload = getContext().getUserPayload();
+ Configuration conf = TezUtils.createConfFromUserPayload(payload);
+
+ this.config = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
+ config.setTaskName(getContext().getTaskVertexName());
+
+ this.runtimeEnvironment = new TezRuntimeEnvironment((long) (0.7 * this.getContext().getTotalMemoryAvailableToTask()));
+
+ this.inputTypeSerializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
+
+ initOutputFormat();
+ }
+
+ @Override
+ public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
+
+ Preconditions.checkArgument((outputs == null) || (outputs.size() == 0));
+ Preconditions.checkArgument(inputs.size() == 1);
+
+ this.inputs = inputs;
+ this.numInputs = inputs.size();
+ this.readers = new ArrayList<KeyValueReader>(numInputs);
+ if (this.inputs != null) {
+ for (LogicalInput input: this.inputs.values()) {
+ //if (input instanceof AbstractLogicalInput) {
+ // ((AbstractLogicalInput) input).initialize();
+ //}
+ input.start();
+ readers.add((KeyValueReader) input.getReader());
+ }
+ }
+
+ this.reader = new TezReaderIterator<IT>(readers.get(0));
+
+ this.invoke();
+ }
+
+ @Override
+ public void handleEvents(List<Event> processorEvents) {
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.runtimeEnvironment.getIOManager().shutdown();
+ }
+
+ private void invoke () {
+ try {
+ // initialize local strategies
+ switch (this.config.getInputLocalStrategy(0)) {
+ case NONE:
+ // nothing to do
+ localStrategy = null;
+ input = reader;
+ break;
+ case SORT:
+ // initialize sort local strategy
+ try {
+ // get type comparator
+ TypeComparatorFactory<IT> compFact = this.config.getInputComparator(0, this.userCodeClassLoader);
+ if (compFact == null) {
+ throw new Exception("Missing comparator factory for local strategy on input " + 0);
+ }
+
+ // initialize sorter
+ UnilateralSortMerger<IT> sorter = new UnilateralSortMerger<IT>(
+ this.runtimeEnvironment.getMemoryManager(),
+ this.runtimeEnvironment.getIOManager(),
+ this.reader, this.invokable, this.inputTypeSerializerFactory, compFact.createComparator(),
+ this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0),
+ this.config.getSpillingThresholdInput(0));
+
+ this.localStrategy = sorter;
+ this.input = sorter.getIterator();
+ } catch (Exception e) {
+ throw new RuntimeException("Initializing the input processing failed" +
+ e.getMessage() == null ? "." : ": " + e.getMessage(), e);
+ }
+ break;
+ default:
+ throw new RuntimeException("Invalid local strategy for DataSinkTask");
+ }
+
+ final TypeSerializer<IT> serializer = this.inputTypeSerializerFactory.getSerializer();
+ final MutableObjectIterator<IT> input = this.input;
+ final OutputFormat<IT> format = this.format;
+
+
+ IT record = serializer.createInstance();
+ format.open (this.getContext().getTaskIndex(), this.getContext().getVertexParallelism());
+
+ // work!
+ while (((record = input.next(record)) != null)) {
+ format.writeRecord(record);
+ }
+
+ this.format.close();
+ this.format = null;
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException();
+ }
+ finally {
+ if (this.format != null) {
+ // close format, if it has not been closed, yet.
+ // This should only be the case if we had a previous error, or were canceled.
+ try {
+ this.format.close();
+ }
+ catch (Throwable t) {
+ //TODO log warning message
+ }
+ }
+ // close local strategy if necessary
+ if (localStrategy != null) {
+ try {
+ this.localStrategy.close();
+ } catch (Throwable t) {
+ //TODO log warning message
+ }
+ }
+ }
+ }
+
+ private void initOutputFormat () {
+ try {
+ this.format = this.config.<OutputFormat<IT>>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(OutputFormat.class, this.userCodeClassLoader);
+
+ // check if the class is a subclass, if the check is required
+ if (!OutputFormat.class.isAssignableFrom(this.format.getClass())) {
+ throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" +
+ OutputFormat.class.getName() + "' as is required.");
+ }
+ }
+ catch (ClassCastException ccex) {
+ throw new RuntimeException("The stub class is not a proper subclass of " + OutputFormat.class.getName(), ccex);
+ }
+
+ // configure the stub. catch exceptions here extra, to report them as originating from the user code
+ try {
+ this.format.configure(this.config.getStubParameters());
+ }
+ catch (Throwable t) {
+ throw new RuntimeException("The user defined 'configure()' method in the Output Format caused an error: "
+ + t.getMessage(), t);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java
new file mode 100644
index 0000000..dd3f843
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.tez.runtime.input.FlinkInput;
+import org.apache.flink.tez.runtime.output.TezChannelSelector;
+import org.apache.flink.tez.runtime.output.TezOutputCollector;
+import org.apache.flink.tez.runtime.output.TezOutputEmitter;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class DataSourceProcessor<OT> extends AbstractLogicalIOProcessor {
+
+ private TezTaskConfig config;
+ protected Map<String, LogicalOutput> outputs;
+ private List<KeyValueWriter> writers;
+ private int numOutputs;
+ private Collector<OT> collector;
+
+ private InputFormat<OT, InputSplit> format;
+ private TypeSerializerFactory<OT> serializerFactory;
+ private FlinkInput input;
+ private ClassLoader userCodeClassLoader = getClass().getClassLoader();
+
+
+ public DataSourceProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ UserPayload payload = getContext().getUserPayload();
+ Configuration conf = TezUtils.createConfFromUserPayload(payload);
+
+ this.config = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
+ config.setTaskName(getContext().getTaskVertexName());
+
+ this.serializerFactory = config.getOutputSerializer(this.userCodeClassLoader);
+
+ initInputFormat();
+ }
+
+ @Override
+ public void handleEvents(List<Event> processorEvents) {
+ int i = 0;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+
+ @Override
+ public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
+
+ Preconditions.checkArgument(inputs.size() == 1);
+ LogicalInput logicalInput = inputs.values().iterator().next();
+ if (!(logicalInput instanceof FlinkInput)) {
+ throw new RuntimeException("Input to Flink Data Source Processor should be of type FlinkInput");
+ }
+ this.input = (FlinkInput) logicalInput;
+ //this.reader = (KeyValueReader) input.getReader();
+
+ // Initialize inputs, get readers and writers
+ this.outputs = outputs;
+ this.numOutputs = outputs.size();
+ this.writers = new ArrayList<KeyValueWriter>(numOutputs);
+ if (this.outputs != null) {
+ for (LogicalOutput output : this.outputs.values()) {
+ output.start();
+ writers.add((KeyValueWriter) output.getWriter());
+ }
+ }
+ this.invoke();
+ }
+
+
+ private void invoke () {
+ final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
+ try {
+ InputSplit split = input.getSplit();
+
+ OT record = serializer.createInstance();
+ final InputFormat<OT, InputSplit> format = this.format;
+ format.open(split);
+
+ int numOutputs = outputs.size();
+ ArrayList<TezChannelSelector<OT>> channelSelectors = new ArrayList<TezChannelSelector<OT>>(numOutputs);
+ ArrayList<Integer> numStreamsInOutputs = this.config.getNumberSubtasksInOutput();
+ for (int i = 0; i < numOutputs; i++) {
+ final ShipStrategyType strategy = config.getOutputShipStrategy(i);
+ final TypeComparatorFactory<OT> compFactory = config.getOutputComparator(i, this.userCodeClassLoader);
+ final DataDistribution dataDist = config.getOutputDataDistribution(i, this.userCodeClassLoader);
+ if (compFactory == null) {
+ channelSelectors.add(i, new TezOutputEmitter<OT>(strategy));
+ } else if (dataDist == null){
+ final TypeComparator<OT> comparator = compFactory.createComparator();
+ channelSelectors.add(i, new TezOutputEmitter<OT>(strategy, comparator));
+ } else {
+ final TypeComparator<OT> comparator = compFactory.createComparator();
+ channelSelectors.add(i,new TezOutputEmitter<OT>(strategy, comparator, dataDist));
+ }
+ }
+ collector = new TezOutputCollector<OT>(writers, channelSelectors, serializerFactory.getSerializer(), numStreamsInOutputs);
+
+ while (!format.reachedEnd()) {
+ // build next pair and ship pair if it is valid
+ if ((record = format.nextRecord(record)) != null) {
+ collector.collect(record);
+ }
+ }
+ format.close();
+
+ collector.close();
+
+ }
+ catch (Exception ex) {
+ // close the input, but do not report any exceptions, since we already have another root cause
+ try {
+ this.format.close();
+ } catch (Throwable t) {}
+ }
+ }
+
+
+ private void initInputFormat() {
+ try {
+ this.format = config.<InputFormat<OT, InputSplit>>getStubWrapper(this.userCodeClassLoader)
+ .getUserCodeObject(InputFormat.class, this.userCodeClassLoader);
+
+ // check if the class is a subclass, if the check is required
+ if (!InputFormat.class.isAssignableFrom(this.format.getClass())) {
+ throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" +
+ InputFormat.class.getName() + "' as is required.");
+ }
+ }
+ catch (ClassCastException ccex) {
+ throw new RuntimeException("The stub class is not a proper subclass of " + InputFormat.class.getName(),
+ ccex);
+ }
+ // configure the stub. catch exceptions here extra, to report them as originating from the user code
+ try {
+ this.format.configure(this.config.getStubParameters());
+ }
+ catch (Throwable t) {
+ throw new RuntimeException("The user defined 'configure()' method caused an error: " + t.getMessage(), t);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
new file mode 100644
index 0000000..6b39734
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.runtime.operators.PactDriver;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class RegularProcessor<S extends Function, OT> extends AbstractLogicalIOProcessor {
+
+ private TezTask<S,OT> task;
+ protected Map<String, LogicalInput> inputs;
+ protected Map<String, LogicalOutput> outputs;
+ private List<KeyValueReader> readers;
+ private List<KeyValueWriter> writers;
+ private int numInputs;
+ private int numOutputs;
+
+
+ public RegularProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ UserPayload payload = getContext().getUserPayload();
+ Configuration conf = TezUtils.createConfFromUserPayload(payload);
+
+ TezTaskConfig taskConfig = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
+ taskConfig.setTaskName(getContext().getTaskVertexName());
+
+ RuntimeUDFContext runtimeUdfContext = new RuntimeUDFContext(getContext().getTaskVertexName(),
+ getContext().getVertexParallelism(),
+ getContext().getTaskIndex(),
+ getClass().getClassLoader(),
+ new ExecutionConfig());
+
+ this.task = new TezTask<S, OT>(taskConfig, runtimeUdfContext, this.getContext().getTotalMemoryAvailableToTask());
+ }
+
+ @Override
+ public void handleEvents(List<Event> processorEvents) {
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ task.getIOManager().shutdown();
+ }
+
+ @Override
+ public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
+
+ this.inputs = inputs;
+ this.outputs = outputs;
+ final Class<? extends PactDriver<S, OT>> driverClass = this.task.getTaskConfig().getDriver();
+ PactDriver<S,OT> driver = InstantiationUtil.instantiate(driverClass, PactDriver.class);
+ this.numInputs = driver.getNumberOfInputs();
+ this.numOutputs = outputs.size();
+
+
+ this.readers = new ArrayList<KeyValueReader>(numInputs);
+ //Ensure size of list is = numInputs
+ for (int i = 0; i < numInputs; i++)
+ this.readers.add(null);
+ HashMap<String, ArrayList<Integer>> inputPositions = ((TezTaskConfig) this.task.getTaskConfig()).getInputPositions();
+ if (this.inputs != null) {
+ for (String name : this.inputs.keySet()) {
+ LogicalInput input = this.inputs.get(name);
+ //if (input instanceof AbstractLogicalInput) {
+ // ((AbstractLogicalInput) input).initialize();
+ //}
+ input.start();
+ ArrayList<Integer> positions = inputPositions.get(name);
+ for (Integer pos : positions) {
+ //int pos = inputPositions.get(name);
+ readers.set(pos, (KeyValueReader) input.getReader());
+ }
+ }
+ }
+
+ this.writers = new ArrayList<KeyValueWriter>(numOutputs);
+ if (this.outputs != null) {
+ for (LogicalOutput output : this.outputs.values()) {
+ output.start();
+ writers.add((KeyValueWriter) output.getWriter());
+ }
+ }
+
+ // Do the work
+ task.invoke (readers, writers);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java
new file mode 100644
index 0000000..39386e6
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+
+public class TezRuntimeEnvironment {
+
+ private static int DEFAULT_PAGE_SIZE = 32768;
+ private static int DEFAULT_NUM_SLOTS = 10;
+
+ private final IOManager ioManager;
+
+ private final MemoryManager memoryManager;
+
+ public TezRuntimeEnvironment(long totalMemory) {
+ int pageSize = DEFAULT_PAGE_SIZE;
+ int numSlots = DEFAULT_NUM_SLOTS;
+ this.memoryManager = new DefaultMemoryManager(totalMemory, numSlots, pageSize);
+ this.ioManager = new IOManagerAsync();
+ }
+
+ public IOManager getIOManager() {
+ return ioManager;
+ }
+
+ public MemoryManager getMemoryManager() {
+ return memoryManager;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
new file mode 100644
index 0000000..90df992
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.PactDriver;
+import org.apache.flink.runtime.operators.PactTaskContext;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
+import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.runtime.operators.util.CloseableInputProvider;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.tez.runtime.input.TezReaderIterator;
+import org.apache.flink.tez.runtime.output.TezChannelSelector;
+import org.apache.flink.tez.runtime.output.TezOutputEmitter;
+import org.apache.flink.tez.runtime.output.TezOutputCollector;
+import org.apache.flink.tez.util.DummyInvokable;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+import java.io.IOException;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+
+public class TezTask<S extends Function,OT> implements PactTaskContext<S, OT> {
+
+ protected static final Log LOG = LogFactory.getLog(TezTask.class);
+
+ DummyInvokable invokable = new DummyInvokable();
+
+ /**
+ * The driver that invokes the user code (the stub implementation). The central driver in this task
+ * (further drivers may be chained behind this driver).
+ */
+ protected volatile PactDriver<S, OT> driver;
+
+ /**
+ * The instantiated user code of this task's main operator (driver). May be null if the operator has no udf.
+ */
+ protected S stub;
+
+ /**
+ * The udf's runtime context.
+ */
+ protected RuntimeUDFContext runtimeUdfContext;
+
+ /**
+ * The collector that forwards the user code's results. May forward to a channel or to chained drivers within
+ * this task.
+ */
+ protected Collector<OT> output;
+
+ /**
+ * The inputs reader, wrapped in an iterator. Prior to the local strategies, etc...
+ */
+ protected MutableObjectIterator<?>[] inputIterators;
+
+ /**
+ * The local strategies that are applied on the inputs.
+ */
+ protected volatile CloseableInputProvider<?>[] localStrategies;
+
+ /**
+ * The inputs to the operator. Return the readers' data after the application of the local strategy
+ * and the temp-table barrier.
+ */
+ protected MutableObjectIterator<?>[] inputs;
+
+ /**
+ * The serializers for the input data type.
+ */
+ protected TypeSerializerFactory<?>[] inputSerializers;
+
+ /**
+ * The comparators for the central driver.
+ */
+ protected TypeComparator<?>[] inputComparators;
+
+ /**
+ * The task configuration with the setup parameters.
+ */
+ protected TezTaskConfig config;
+
+ /**
+ * The class loader used to instantiate user code and user data types.
+ */
+ protected ClassLoader userCodeClassLoader = ClassLoader.getSystemClassLoader();
+
+ /**
+ * For now, create a default ExecutionConfig
+ */
+ protected ExecutionConfig executionConfig;
+
+ /*
+ * Tez-specific variables given by the Processor
+ */
+ protected TypeSerializer<OT> outSerializer;
+
+ protected List<Integer> numberOfSubTasksInOutputs;
+
+ protected String taskName;
+
+ protected int numberOfSubtasks;
+
+ protected int indexInSubtaskGroup;
+
+ TezRuntimeEnvironment runtimeEnvironment;
+
+ public TezTask(TezTaskConfig config, RuntimeUDFContext runtimeUdfContext, long availableMemory) {
+ this.config = config;
+ final Class<? extends PactDriver<S, OT>> driverClass = this.config.getDriver();
+ this.driver = InstantiationUtil.instantiate(driverClass, PactDriver.class);
+
+ LOG.info("ClassLoader URLs: " + Arrays.toString(((URLClassLoader) this.userCodeClassLoader).getURLs()));
+
+ this.stub = this.config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(Function.class, this.userCodeClassLoader); //TODO get superclass properly
+ this.runtimeUdfContext = runtimeUdfContext;
+ this.outSerializer = (TypeSerializer<OT>) this.config.getOutputSerializer(getClass().getClassLoader()).getSerializer();
+ this.numberOfSubTasksInOutputs = this.config.getNumberSubtasksInOutput();
+ this.taskName = this.config.getTaskName();
+ this.numberOfSubtasks = this.runtimeUdfContext.getNumberOfParallelSubtasks();
+ this.indexInSubtaskGroup = this.runtimeUdfContext.getIndexOfThisSubtask();
+ this.runtimeEnvironment = new TezRuntimeEnvironment((long) (0.7 * availableMemory));
+ this.executionConfig = runtimeUdfContext.getExecutionConfig();
+ this.invokable.setExecutionConfig(this.executionConfig);
+ }
+
+
+ //-------------------------------------------------------------
+ // Interface to FlinkProcessor
+ //-------------------------------------------------------------
+
+ public void invoke(List<KeyValueReader> readers, List<KeyValueWriter> writers) throws Exception {
+
+ // whatever happens in this scope, make sure that the local strategies are cleaned up!
+ // note that the initialization of the local strategies is in the try-finally block as well,
+ // so that the thread that creates them catches its own errors that may happen in that process.
+ // this is especially important, since there may be asynchronous closes (such as through canceling).
+ try {
+ // initialize the inputs and outputs
+ initInputsOutputs(readers, writers);
+
+ // pre main-function initialization
+ initialize();
+
+ // the work goes here
+ run();
+ }
+ finally {
+ // clean up in any case!
+ closeLocalStrategies();
+ }
+ }
+
+
+ /*
+ * Initialize inputs, input serializers, input comparators, and collector
+ * Assumes that the config and userCodeClassLoader has been set
+ */
+ private void initInputsOutputs (List<KeyValueReader> readers, List<KeyValueWriter> writers) throws Exception {
+
+ int numInputs = readers.size();
+ Preconditions.checkArgument(numInputs == driver.getNumberOfInputs());
+
+ // Prior to local strategies
+ this.inputIterators = new MutableObjectIterator[numInputs];
+ //local strategies
+ this.localStrategies = new CloseableInputProvider[numInputs];
+ // After local strategies
+ this.inputs = new MutableObjectIterator[numInputs];
+
+ int numComparators = driver.getNumberOfDriverComparators();
+ initInputsSerializersAndComparators(numInputs, numComparators);
+
+ int index = 0;
+ for (KeyValueReader reader : readers) {
+ this.inputIterators[index] = new TezReaderIterator<Object>(reader);
+ initInputLocalStrategy(index);
+ index++;
+ }
+
+ int numOutputs = writers.size();
+ ArrayList<TezChannelSelector<OT>> channelSelectors = new ArrayList<TezChannelSelector<OT>>(numOutputs);
+ //ArrayList<Integer> numStreamsInOutputs = new ArrayList<Integer>(numOutputs);
+ for (int i = 0; i < numOutputs; i++) {
+ final ShipStrategyType strategy = config.getOutputShipStrategy(i);
+ final TypeComparatorFactory<OT> compFactory = config.getOutputComparator(i, this.userCodeClassLoader);
+ final DataDistribution dataDist = config.getOutputDataDistribution(i, this.userCodeClassLoader);
+ if (compFactory == null) {
+ channelSelectors.add(i, new TezOutputEmitter<OT>(strategy));
+ } else if (dataDist == null){
+ final TypeComparator<OT> comparator = compFactory.createComparator();
+ channelSelectors.add(i, new TezOutputEmitter<OT>(strategy, comparator));
+ } else {
+ final TypeComparator<OT> comparator = compFactory.createComparator();
+ channelSelectors.add(i,new TezOutputEmitter<OT>(strategy, comparator, dataDist));
+ }
+ }
+ this.output = new TezOutputCollector<OT>(writers, channelSelectors, outSerializer, numberOfSubTasksInOutputs);
+ }
+
+
+
+ // --------------------------------------------------------------------
+ // PactTaskContext interface
+ // --------------------------------------------------------------------
+
+ @Override
+ public TaskConfig getTaskConfig() {
+ return (TaskConfig) this.config;
+ }
+
+ @Override
+ public ExecutionConfig getExecutionConfig() {
+ return executionConfig;
+ }
+
+ @Override
+ public ClassLoader getUserCodeClassLoader() {
+ return this.userCodeClassLoader;
+ }
+
+ @Override
+ public MemoryManager getMemoryManager() {
+ return runtimeEnvironment.getMemoryManager();
+ }
+
+ @Override
+ public IOManager getIOManager() {
+ return runtimeEnvironment.getIOManager();
+ }
+
+ @Override
+ public <X> MutableObjectIterator<X> getInput(int index) {
+ if (index < 0 || index > this.driver.getNumberOfInputs()) {
+ throw new IndexOutOfBoundsException();
+ }
+ // check for lazy assignment from input strategies
+ if (this.inputs[index] != null) {
+ @SuppressWarnings("unchecked")
+ MutableObjectIterator<X> in = (MutableObjectIterator<X>) this.inputs[index];
+ return in;
+ } else {
+ final MutableObjectIterator<X> in;
+ try {
+ if (this.localStrategies[index] != null) {
+ @SuppressWarnings("unchecked")
+ MutableObjectIterator<X> iter = (MutableObjectIterator<X>) this.localStrategies[index].getIterator();
+ in = iter;
+ } else {
+ throw new RuntimeException("Bug: null input iterator, null temp barrier, and null local strategy.");
+ }
+ this.inputs[index] = in;
+ return in;
+ } catch (InterruptedException iex) {
+ throw new RuntimeException("Interrupted while waiting for input " + index + " to become available.");
+ } catch (IOException ioex) {
+ throw new RuntimeException("An I/O Exception occurred whily obaining input " + index + ".");
+ }
+ }
+ }
+
+ @Override
+ public <X> TypeSerializerFactory<X> getInputSerializer(int index) {
+ if (index < 0 || index >= this.driver.getNumberOfInputs()) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ @SuppressWarnings("unchecked")
+ final TypeSerializerFactory<X> serializerFactory = (TypeSerializerFactory<X>) this.inputSerializers[index];
+ return serializerFactory;
+ }
+
+ @Override
+ public <X> TypeComparator<X> getDriverComparator(int index) {
+ if (this.inputComparators == null) {
+ throw new IllegalStateException("Comparators have not been created!");
+ }
+ else if (index < 0 || index >= this.driver.getNumberOfDriverComparators()) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ @SuppressWarnings("unchecked")
+ final TypeComparator<X> comparator = (TypeComparator<X>) this.inputComparators[index];
+ return comparator;
+ }
+
+
+
+ @Override
+ public S getStub() {
+ return this.stub;
+ }
+
+ @Override
+ public Collector<OT> getOutputCollector() {
+ return this.output;
+ }
+
+ @Override
+ public AbstractInvokable getOwningNepheleTask() {
+ return this.invokable;
+ }
+
+ @Override
+ public String formatLogString(String message) {
+ return null;
+ }
+
+
+ // --------------------------------------------------------------------
+ // Adapted from RegularPactTask
+ // --------------------------------------------------------------------
+
+ private void initInputLocalStrategy(int inputNum) throws Exception {
+ // check if there is already a strategy
+ if (this.localStrategies[inputNum] != null) {
+ throw new IllegalStateException();
+ }
+
+ // now set up the local strategy
+ final LocalStrategy localStrategy = this.config.getInputLocalStrategy(inputNum);
+ if (localStrategy != null) {
+ switch (localStrategy) {
+ case NONE:
+ // the input is as it is
+ this.inputs[inputNum] = this.inputIterators[inputNum];
+ break;
+ case SORT:
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(),
+ this.inputIterators[inputNum], this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
+ this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
+ this.config.getSpillingThresholdInput(inputNum));
+ // set the input to null such that it will be lazily fetched from the input strategy
+ this.inputs[inputNum] = null;
+ this.localStrategies[inputNum] = sorter;
+ break;
+ case COMBININGSORT:
+ // sanity check this special case!
+ // this still breaks a bit of the abstraction!
+ // we should have nested configurations for the local strategies to solve that
+ if (inputNum != 0) {
+ throw new IllegalStateException("Performing combining sort outside a (group)reduce task!");
+ }
+
+ // instantiate ourselves a combiner. we should not use the stub, because the sort and the
+ // subsequent (group)reduce would otherwise share it multi-threaded
+ final Class<S> userCodeFunctionType = this.driver.getStubType();
+ if (userCodeFunctionType == null) {
+ throw new IllegalStateException("Performing combining sort outside a reduce task!");
+ }
+ final S localStub;
+ try {
+ localStub = initStub(userCodeFunctionType);
+ } catch (Exception e) {
+ throw new RuntimeException("Initializing the user code and the configuration failed" +
+ e.getMessage() == null ? "." : ": " + e.getMessage(), e);
+ }
+
+ if (!(localStub instanceof GroupCombineFunction)) {
+ throw new IllegalStateException("Performing combining sort outside a reduce task!");
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger(
+ (GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
+ this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
+ this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
+ this.config.getSpillingThresholdInput(inputNum));
+ cSorter.setUdfConfiguration(this.config.getStubParameters());
+
+ // set the input to null such that it will be lazily fetched from the input strategy
+ this.inputs[inputNum] = null;
+ this.localStrategies[inputNum] = cSorter;
+ break;
+ default:
+ throw new Exception("Unrecognized local strategy provided: " + localStrategy.name());
+ }
+ } else {
+ // no local strategy in the config
+ this.inputs[inputNum] = this.inputIterators[inputNum];
+ }
+ }
+
+ private <T> TypeComparator<T> getLocalStrategyComparator(int inputNum) throws Exception {
+ TypeComparatorFactory<T> compFact = this.config.getInputComparator(inputNum, this.userCodeClassLoader);
+ if (compFact == null) {
+ throw new Exception("Missing comparator factory for local strategy on input " + inputNum);
+ }
+ return compFact.createComparator();
+ }
+
+ protected S initStub(Class<? super S> stubSuperClass) throws Exception {
+ try {
+ S stub = config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(stubSuperClass, this.userCodeClassLoader);
+ // check if the class is a subclass, if the check is required
+ if (stubSuperClass != null && !stubSuperClass.isAssignableFrom(stub.getClass())) {
+ throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" +
+ stubSuperClass.getName() + "' as is required.");
+ }
+ FunctionUtils.setFunctionRuntimeContext(stub, this.runtimeUdfContext);
+ return stub;
+ }
+ catch (ClassCastException ccex) {
+ throw new Exception("The stub class is not a proper subclass of " + stubSuperClass.getName(), ccex);
+ }
+ }
+
+ /**
+ * Creates all the serializers and comparators.
+ */
+ protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception {
+ this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
+ this.inputComparators = numComparators > 0 ? new TypeComparator[numComparators] : null;
+ //this.inputComparators = this.driver.requiresComparatorOnInput() ? new TypeComparator[numInputs] : null;
+ this.inputIterators = new MutableObjectIterator[numInputs];
+
+ for (int i = 0; i < numInputs; i++) {
+ // ---------------- create the serializer first ---------------------
+ final TypeSerializerFactory<?> serializerFactory = this.config.getInputSerializer(i, this.userCodeClassLoader);
+ this.inputSerializers[i] = serializerFactory;
+ // this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
+ }
+ // ---------------- create the driver's comparators ---------------------
+ for (int i = 0; i < numComparators; i++) {
+ if (this.inputComparators != null) {
+ final TypeComparatorFactory<?> comparatorFactory = this.config.getDriverComparator(i, this.userCodeClassLoader);
+ this.inputComparators[i] = comparatorFactory.createComparator();
+ }
+ }
+ }
+
+ protected void initialize() throws Exception {
+ // create the operator
+ try {
+ this.driver.setup(this);
+ }
+ catch (Throwable t) {
+ throw new Exception("The driver setup for '" + //TODO put taks name here
+ "' , caused an error: " + t.getMessage(), t);
+ }
+
+ //this.runtimeUdfContext = createRuntimeContext();
+
+ // instantiate the UDF
+ try {
+ final Class<? super S> userCodeFunctionType = this.driver.getStubType();
+ // if the class is null, the driver has no user code
+ if (userCodeFunctionType != null) {
+ this.stub = initStub(userCodeFunctionType);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Initializing the UDF" +
+ e.getMessage() == null ? "." : ": " + e.getMessage(), e);
+ }
+ }
+
+ /*
+ public RuntimeUDFContext createRuntimeContext() {
+ return new RuntimeUDFContext(this.taskName, this.numberOfSubtasks, this.indexInSubtaskGroup, null);
+ }
+ */
+
+ protected void closeLocalStrategies() {
+ if (this.localStrategies != null) {
+ for (int i = 0; i < this.localStrategies.length; i++) {
+ if (this.localStrategies[i] != null) {
+ try {
+ this.localStrategies[i].close();
+ } catch (Throwable t) {
+ LOG.error("Error closing local strategy for input " + i, t);
+ }
+ }
+ }
+ }
+ }
+
+ protected void run() throws Exception {
+ // ---------------------------- Now, the actual processing starts ------------------------
+ // check for asynchronous canceling
+
+ boolean stubOpen = false;
+
+ try {
+ // run the data preparation
+ try {
+ this.driver.prepare();
+ }
+ catch (Throwable t) {
+ // if the preparation caused an error, clean up
+ // errors during clean-up are swallowed, because we have already a root exception
+ throw new Exception("The data preparation for task '" + this.taskName +
+ "' , caused an error: " + t.getMessage(), t);
+ }
+
+ // open stub implementation
+ if (this.stub != null) {
+ try {
+ Configuration stubConfig = this.config.getStubParameters();
+ FunctionUtils.openFunction(this.stub, stubConfig);
+ stubOpen = true;
+ }
+ catch (Throwable t) {
+ throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t);
+ }
+ }
+
+ // run the user code
+ this.driver.run();
+
+ // close. We close here such that a regular close throwing an exception marks a task as failed.
+ if (this.stub != null) {
+ FunctionUtils.closeFunction(this.stub);
+ stubOpen = false;
+ }
+
+ this.output.close();
+
+ }
+ catch (Exception ex) {
+ // close the input, but do not report any exceptions, since we already have another root cause
+ ex.printStackTrace();
+ throw new RuntimeException("Exception in TaskContext: " + ex.getMessage() + " "+ ex.getStackTrace());
+ }
+ finally {
+ this.driver.cleanup();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
new file mode 100644
index 0000000..94a8315
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTaskConfig.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+public class TezTaskConfig extends TaskConfig {
+
+ public static final String TEZ_TASK_CONFIG = "tez.task.flink.processor.taskconfig";
+
+ private static final String NUMBER_SUBTASKS_IN_OUTPUTS = "tez.num_subtasks_in_output";
+
+ private static final String INPUT_SPLIT_PROVIDER = "tez.input_split_provider";
+
+ private static final String INPUT_POSITIONS = "tez.input_positions";
+
+ private static final String INPUT_FORMAT = "tez.input_format";
+
+ private static final String DATASOURCE_PROCESSOR_NAME = "tez.datasource_processor_name";
+
+ public TezTaskConfig(Configuration config) {
+ super(config);
+ }
+
+
+ public void setDatasourceProcessorName(String name) {
+ if (name != null) {
+ this.config.setString(DATASOURCE_PROCESSOR_NAME, name);
+ }
+ }
+
+ public String getDatasourceProcessorName() {
+ return this.config.getString(DATASOURCE_PROCESSOR_NAME, null);
+ }
+
+ public void setNumberSubtasksInOutput(ArrayList<Integer> numberSubtasksInOutputs) {
+ try {
+ InstantiationUtil.writeObjectToConfig(numberSubtasksInOutputs, this.config, NUMBER_SUBTASKS_IN_OUTPUTS);
+ } catch (IOException e) {
+ throw new RuntimeException("Error while writing the input split provider object to the task configuration.");
+ }
+ }
+
+ public ArrayList<Integer> getNumberSubtasksInOutput() {
+ ArrayList<Integer> numberOfSubTasksInOutputs = null;
+ try {
+ numberOfSubTasksInOutputs = (ArrayList) InstantiationUtil.readObjectFromConfig(this.config, NUMBER_SUBTASKS_IN_OUTPUTS, getClass().getClassLoader());
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Error while reading the number of subtasks in outputs object from the task configuration.");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Error while reading the number of subtasks in outpurs object from the task configuration. " +
+ "class not found.");
+ }
+ if (numberOfSubTasksInOutputs == null) {
+ throw new NullPointerException();
+ }
+ return numberOfSubTasksInOutputs;
+
+ }
+
+
+ public void setInputSplitProvider (InputSplitProvider inputSplitProvider) {
+ try {
+ InstantiationUtil.writeObjectToConfig(inputSplitProvider, this.config, INPUT_SPLIT_PROVIDER);
+ } catch (IOException e) {
+ throw new RuntimeException("Error while writing the input split provider object to the task configuration.");
+ }
+ }
+
+ public InputSplitProvider getInputSplitProvider () {
+ InputSplitProvider inputSplitProvider = null;
+ try {
+ inputSplitProvider = (InputSplitProvider) InstantiationUtil.readObjectFromConfig(this.config, INPUT_SPLIT_PROVIDER, getClass().getClassLoader());
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Error while reading the input split provider object from the task configuration.");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Error while reading the input split provider object from the task configuration. " +
+ "ChannelSelector class not found.");
+ }
+ if (inputSplitProvider == null) {
+ throw new NullPointerException();
+ }
+ return inputSplitProvider;
+ }
+
+
+ public void setInputPositions(HashMap<String,ArrayList<Integer>> inputPositions) {
+ try {
+ InstantiationUtil.writeObjectToConfig(inputPositions, this.config, INPUT_POSITIONS);
+ } catch (IOException e) {
+ throw new RuntimeException("Error while writing the input positions object to the task configuration.");
+ }
+ }
+
+ public HashMap<String,ArrayList<Integer>> getInputPositions () {
+ HashMap<String,ArrayList<Integer>> inputPositions = null;
+ try {
+ inputPositions = (HashMap) InstantiationUtil.readObjectFromConfig(this.config, INPUT_POSITIONS, getClass().getClassLoader());
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Error while reading the input positions object from the task configuration.");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Error while reading the input positions object from the task configuration. " +
+ "ChannelSelector class not found.");
+ }
+ if (inputPositions == null) {
+ throw new NullPointerException();
+ }
+ return inputPositions;
+ }
+
+ public void setInputFormat (InputFormat inputFormat) {
+ try {
+ InstantiationUtil.writeObjectToConfig(inputFormat, this.config, INPUT_FORMAT);
+ } catch (IOException e) {
+ throw new RuntimeException("Error while writing the input format object to the task configuration.");
+ }
+ }
+
+ public InputFormat getInputFormat () {
+ InputFormat inputFormat = null;
+ try {
+ inputFormat = (InputFormat) InstantiationUtil.readObjectFromConfig(this.config, INPUT_FORMAT, getClass().getClassLoader());
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Error while reading the input split provider object from the task configuration.");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Error while reading the input split provider object from the task configuration. " +
+ "ChannelSelector class not found.");
+ }
+ if (inputFormat == null) {
+ throw new NullPointerException();
+ }
+ return inputFormat;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
new file mode 100644
index 0000000..7ceeac8
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/UnionProcessor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class UnionProcessor extends AbstractLogicalIOProcessor {
+
+ private TezTaskConfig config;
+ protected Map<String, LogicalInput> inputs;
+ protected Map<String, LogicalOutput> outputs;
+ private List<KeyValueReader> readers;
+ private List<KeyValueWriter> writers;
+ private int numInputs;
+ private int numOutputs;
+
+ public UnionProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ UserPayload payload = getContext().getUserPayload();
+ Configuration conf = TezUtils.createConfFromUserPayload(payload);
+
+ this.config = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
+ config.setTaskName(getContext().getTaskVertexName());
+ }
+
+ @Override
+ public void handleEvents(List<Event> processorEvents) {
+
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+
+ @Override
+ public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
+ this.inputs = inputs;
+ this.outputs = outputs;
+ this.numInputs = inputs.size();
+ this.numOutputs = outputs.size();
+
+ this.readers = new ArrayList<KeyValueReader>(numInputs);
+ if (this.inputs != null) {
+ for (LogicalInput input: this.inputs.values()) {
+ input.start();
+ readers.add((KeyValueReader) input.getReader());
+ }
+ }
+
+ this.writers = new ArrayList<KeyValueWriter>(numOutputs);
+ if (this.outputs != null) {
+ for (LogicalOutput output : this.outputs.values()) {
+ output.start();
+ writers.add((KeyValueWriter) output.getWriter());
+ }
+ }
+
+ Preconditions.checkArgument(writers.size() == 1);
+ KeyValueWriter writer = writers.get(0);
+
+ for (KeyValueReader reader: this.readers) {
+ while (reader.next()) {
+ Object key = reader.getCurrentKey();
+ Object value = reader.getCurrentValue();
+ writer.write(key, value);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
new file mode 100644
index 0000000..ef59fd0
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInput.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.input;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+public class FlinkInput extends AbstractLogicalInput {
+
+ private static final Log LOG = LogFactory.getLog(FlinkInput.class);
+
+ private InputSplit split;
+ private boolean splitIsCreated;
+ private final ReentrantLock rrLock = new ReentrantLock();
+ private final Condition rrInited = rrLock.newCondition();
+
+ public FlinkInput(InputContext inputContext, int numPhysicalInputs) {
+ super(inputContext, numPhysicalInputs);
+ getContext().requestInitialMemory(0l, null); // mandatory call
+ split = null;
+ }
+
+ @Override
+ public void handleEvents(List<Event> inputEvents) throws Exception {
+
+ LOG.info("Received " + inputEvents.size() + " events (should be = 1)");
+
+ Event event = inputEvents.iterator().next();
+
+ Preconditions.checkArgument(event instanceof InputDataInformationEvent,
+ getClass().getSimpleName()
+ + " can only handle a single event of type: "
+ + InputDataInformationEvent.class.getSimpleName());
+
+ initSplitFromEvent ((InputDataInformationEvent)event);
+ }
+
+ private void initSplitFromEvent (InputDataInformationEvent e) throws Exception {
+ rrLock.lock();
+
+ try {
+ ByteString byteString = ByteString.copyFrom(e.getUserPayload());
+ this.split = (InputSplit) InstantiationUtil.deserializeObject(byteString.toByteArray(), getClass().getClassLoader());
+ this.splitIsCreated = true;
+
+ LOG.info ("Initializing input split " + split.getSplitNumber() + ": " + split.toString() + " from event (" + e.getSourceIndex() + "," + e.getTargetIndex() + "): " + e.toString());
+
+ rrInited.signal();
+ }
+ catch (Exception ex) {
+ throw new IOException(
+ "Interrupted waiting for InputSplit initialization");
+ }
+ finally {
+ rrLock.unlock();
+ }
+ }
+
+ @Override
+ public List<Event> close() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void start() throws Exception {
+ }
+
+ @Override
+ public Reader getReader() throws Exception {
+ throw new RuntimeException("FlinkInput does not contain a Reader. Should use getSplit instead");
+ }
+
+ @Override
+ public List<Event> initialize() throws Exception {
+ return null;
+ }
+
+ public InputSplit getSplit () throws Exception {
+
+ rrLock.lock();
+ try {
+ if (!splitIsCreated) {
+ checkAndAwaitSplitInitialization();
+ }
+ }
+ finally {
+ rrLock.unlock();
+ }
+ if (split == null) {
+ LOG.info("Input split has not been created. This should not happen");
+ throw new RuntimeException("Input split has not been created. This should not happen");
+ }
+ return split;
+ }
+
+ void checkAndAwaitSplitInitialization() throws IOException {
+ assert rrLock.getHoldCount() == 1;
+ rrLock.lock();
+ try {
+ rrInited.await();
+ } catch (Exception e) {
+ throw new IOException(
+ "Interrupted waiting for InputSplit initialization");
+ } finally {
+ rrLock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
new file mode 100644
index 0000000..db1261c
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/FlinkInputSplitGenerator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.input;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+public class FlinkInputSplitGenerator extends InputInitializer {
+
+ private static final Log LOG = LogFactory.getLog(FlinkInputSplitGenerator.class);
+
+ InputFormat format;
+
+ public FlinkInputSplitGenerator(InputInitializerContext initializerContext) {
+ super(initializerContext);
+ }
+
+ @Override
+ public List<Event> initialize() throws Exception {
+
+ Configuration tezConf = TezUtils.createConfFromUserPayload(this.getContext().getUserPayload());
+
+ TezTaskConfig taskConfig = (TezTaskConfig) EncodingUtils.decodeObjectFromString(tezConf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader());
+
+ this.format = taskConfig.getInputFormat();
+
+ int numTasks = this.getContext().getNumTasks();
+
+ LOG.info ("Creating splits for " + numTasks + " tasks for input format " + format);
+
+ InputSplit[] splits = format.createInputSplits((numTasks > 0) ? numTasks : 1 );
+
+ LOG.info ("Created " + splits.length + " input splits" + " tasks for input format " + format);
+
+ //LOG.info ("Created + " + splits.length + " input splits for input format " + format);
+
+ LOG.info ("Sending input split events");
+ LinkedList<Event> events = new LinkedList<Event>();
+ for (int i = 0; i < splits.length; i++) {
+ byte [] bytes = InstantiationUtil.serializeObject(splits[i]);
+ ByteBuffer buf = ByteBuffer.wrap(bytes);
+ InputDataInformationEvent event = InputDataInformationEvent.createWithSerializedPayload(i % numTasks, buf);
+ event.setTargetIndex(i % numTasks);
+ events.add(event);
+ LOG.info ("Added event of index " + i + ": " + event);
+ }
+ return events;
+ }
+
+ @Override
+ public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
+
+ }
+
+ @Override
+ public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+ //super.onVertexStateUpdated(stateUpdate);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
new file mode 100644
index 0000000..722f0a1
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/input/TezReaderIterator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.input;
+
+
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+import java.io.IOException;
+
+public class TezReaderIterator<T> implements MutableObjectIterator<T>{
+
+ private KeyValueReader kvReader;
+
+ public TezReaderIterator(KeyValueReader kvReader) {
+ this.kvReader = kvReader;
+ }
+
+ @Override
+ public T next(T reuse) throws IOException {
+ if (kvReader.next()) {
+ Object key = kvReader.getCurrentKey();
+ Object value = kvReader.getCurrentValue();
+ if (!(key instanceof IntWritable)) {
+ throw new IllegalStateException("Wrong key type");
+ }
+ reuse = (T) value;
+ return reuse;
+ }
+ else {
+ return null;
+ }
+ }
+
+ @Override
+ public T next() throws IOException {
+ if (kvReader.next()) {
+ Object key = kvReader.getCurrentKey();
+ Object value = kvReader.getCurrentValue();
+ if (!(key instanceof IntWritable)) {
+ throw new IllegalStateException("Wrong key type");
+ }
+ return (T) value;
+ }
+ else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
new file mode 100644
index 0000000..2358f29
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/SimplePartitioner.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.output;
+
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.tez.runtime.library.api.Partitioner;
+
+public class SimplePartitioner implements Partitioner {
+
+ @Override
+ public int getPartition(Object key, Object value, int numPartitions) {
+ if (!(key instanceof IntWritable)) {
+ throw new IllegalStateException("Partitioning key should be int");
+ }
+ IntWritable channel = (IntWritable) key;
+ return channel.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
new file mode 100644
index 0000000..7e5cd55
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezChannelSelector.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.output;
+
+import java.io.Serializable;
+
+public interface TezChannelSelector<T> extends Serializable {
+
+ /**
+ * Called to determine to which attached {@link org.apache.flink.runtime.io.network.channels.OutputChannel} objects the given record shall be forwarded.
+ *
+ * @param record
+ * the record to the determine the output channels for
+ * @param numberOfOutputChannels
+ * the total number of output channels which are attached to respective output gate
+ * @return a (possibly empty) array of integer numbers which indicate the indices of the output channels through
+ * which the record shall be forwarded
+ */
+ int[] selectChannels(T record, int numberOfOutputChannels);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
new file mode 100644
index 0000000..b68e6c8
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputCollector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.output;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+import java.io.IOException;
+import java.util.List;
+
+public class TezOutputCollector<T> implements Collector<T> {
+
+ private List<KeyValueWriter> writers;
+
+ private List<TezChannelSelector<T>> outputEmitters;
+
+ private List<Integer> numberOfStreamsInOutputs;
+
+ private int numOutputs;
+
+ private TypeSerializer<T> serializer;
+
+ public TezOutputCollector(List<KeyValueWriter> writers, List<TezChannelSelector<T>> outputEmitters, TypeSerializer<T> serializer, List<Integer> numberOfStreamsInOutputs) {
+ this.writers = writers;
+ this.outputEmitters = outputEmitters;
+ this.numberOfStreamsInOutputs = numberOfStreamsInOutputs;
+ this.serializer = serializer;
+ this.numOutputs = writers.size();
+ }
+
+ @Override
+ public void collect(T record) {
+ for (int i = 0; i < numOutputs; i++) {
+ KeyValueWriter writer = writers.get(i);
+ TezChannelSelector<T> outputEmitter = outputEmitters.get(i);
+ int numberOfStreamsInOutput = numberOfStreamsInOutputs.get(i);
+ try {
+ for (int channel : outputEmitter.selectChannels(record, numberOfStreamsInOutput)) {
+ IntWritable key = new IntWritable(channel);
+ writer.write(key, record);
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
new file mode 100644
index 0000000..6dcee0b
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/output/TezOutputEmitter.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.runtime.output;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+public class TezOutputEmitter<T> implements TezChannelSelector<T> {
+
+ private final ShipStrategyType strategy; // the shipping strategy used by this output emitter
+
+ private int[] channels; // the reused array defining target channels
+
+ private int nextChannelToSendTo = 0; // counter to go over channels round robin
+
+ private final TypeComparator<T> comparator; // the comparator for hashing / sorting
+
+ // ------------------------------------------------------------------------
+ // Constructors
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new channel selector that distributes data round robin.
+ */
+ public TezOutputEmitter() {
+ this(ShipStrategyType.NONE);
+ }
+
+ /**
+ * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...).
+ *
+ * @param strategy The distribution strategy to be used.
+ */
+ public TezOutputEmitter(ShipStrategyType strategy) {
+ this(strategy, null);
+ }
+
+ /**
+ * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...)
+ * and uses the supplied comparator to hash / compare records for partitioning them deterministically.
+ *
+ * @param strategy The distribution strategy to be used.
+ * @param comparator The comparator used to hash / compare the records.
+ */
+ public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator) {
+ this(strategy, comparator, null);
+ }
+
+ /**
+ * Creates a new channel selector that uses the given strategy (broadcasting, partitioning, ...)
+ * and uses the supplied comparator to hash / compare records for partitioning them deterministically.
+ *
+ * @param strategy The distribution strategy to be used.
+ * @param comparator The comparator used to hash / compare the records.
+ * @param distr The distribution pattern used in the case of a range partitioning.
+ */
+ public TezOutputEmitter(ShipStrategyType strategy, TypeComparator<T> comparator, DataDistribution distr) {
+ if (strategy == null) {
+ throw new NullPointerException();
+ }
+
+ this.strategy = strategy;
+ this.comparator = comparator;
+
+ switch (strategy) {
+ case FORWARD:
+ case PARTITION_HASH:
+ case PARTITION_RANGE:
+ case PARTITION_RANDOM:
+ case PARTITION_FORCED_REBALANCE:
+ case BROADCAST:
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid shipping strategy for OutputEmitter: " + strategy.name());
+ }
+
+ if ((strategy == ShipStrategyType.PARTITION_RANGE) && distr == null) {
+ throw new NullPointerException("Data distribution must not be null when the ship strategy is range partitioning.");
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Channel Selection
+ // ------------------------------------------------------------------------
+
+ @Override
+ public final int[] selectChannels(T record, int numberOfChannels) {
+ switch (strategy) {
+ case FORWARD:
+ case PARTITION_RANDOM:
+ case PARTITION_FORCED_REBALANCE:
+ return robin(numberOfChannels);
+ case PARTITION_HASH:
+ return hashPartitionDefault(record, numberOfChannels);
+ case PARTITION_RANGE:
+ return rangePartition(record, numberOfChannels);
+ case BROADCAST:
+ return broadcast(numberOfChannels);
+ default:
+ throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private final int[] robin(int numberOfChannels) {
+ if (this.channels == null || this.channels.length != 1) {
+ this.channels = new int[1];
+ }
+
+ int nextChannel = nextChannelToSendTo + 1;
+ nextChannel = nextChannel < numberOfChannels ? nextChannel : 0;
+
+ this.nextChannelToSendTo = nextChannel;
+ this.channels[0] = nextChannel;
+ return this.channels;
+ }
+
+ private final int[] broadcast(int numberOfChannels) {
+ if (channels == null || channels.length != numberOfChannels) {
+ channels = new int[numberOfChannels];
+ for (int i = 0; i < numberOfChannels; i++) {
+ channels[i] = i;
+ }
+ }
+
+ return channels;
+ }
+
+ private final int[] hashPartitionDefault(T record, int numberOfChannels) {
+ if (channels == null || channels.length != 1) {
+ channels = new int[1];
+ }
+
+ int hash = this.comparator.hash(record);
+
+ hash = murmurHash(hash);
+
+ if (hash >= 0) {
+ this.channels[0] = hash % numberOfChannels;
+ }
+ else if (hash != Integer.MIN_VALUE) {
+ this.channels[0] = -hash % numberOfChannels;
+ }
+ else {
+ this.channels[0] = 0;
+ }
+
+ return this.channels;
+ }
+
+ private final int murmurHash(int k) {
+ k *= 0xcc9e2d51;
+ k = Integer.rotateLeft(k, 15);
+ k *= 0x1b873593;
+
+ k = Integer.rotateLeft(k, 13);
+ k *= 0xe6546b64;
+
+ k ^= 4;
+ k ^= k >>> 16;
+ k *= 0x85ebca6b;
+ k ^= k >>> 13;
+ k *= 0xc2b2ae35;
+ k ^= k >>> 16;
+
+ return k;
+ }
+
+ private final int[] rangePartition(T record, int numberOfChannels) {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
new file mode 100644
index 0000000..39d247c
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/DummyInvokable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.util;
+
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+public class DummyInvokable extends AbstractInvokable {
+
+ private ExecutionConfig executionConfig;
+
+ public DummyInvokable() {
+ }
+
+ public DummyInvokable(ExecutionConfig executionConfig) {
+ this.executionConfig = executionConfig;
+ }
+
+ public void setExecutionConfig(ExecutionConfig executionConfig) {
+ this.executionConfig = executionConfig;
+ }
+
+ @Override
+ public void registerInputOutput() {}
+
+
+ @Override
+ public void invoke() throws Exception {}
+
+ @Override
+ public ExecutionConfig getExecutionConfig() {
+ return executionConfig;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
new file mode 100644
index 0000000..202cb24
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/util/EncodingUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.util;
+
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.commons.codec.binary.Base64;
+
+import java.io.IOException;
+
+public class EncodingUtils {
+
+ public static Object decodeObjectFromString(String encoded, ClassLoader cl) {
+
+ try {
+ if (encoded == null) {
+ return null;
+ }
+ byte[] bytes = Base64.decodeBase64(encoded);
+
+ return InstantiationUtil.deserializeObject(bytes, cl);
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ System.exit(-1);
+ throw new RuntimeException();
+ }
+ catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ System.exit(-1);
+ throw new RuntimeException();
+ }
+ }
+
+ public static String encodeObjectToString(Object o) {
+
+ try {
+ byte[] bytes = InstantiationUtil.serializeObject(o);
+
+ String encoded = Base64.encodeBase64String(bytes);
+ return encoded;
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ System.exit(-1);
+ throw new RuntimeException();
+ }
+ }
+}