You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hs...@apache.org on 2015/10/06 17:14:23 UTC
[3/5] flink git commit: [FLINK-2815] [REFACTOR] Remove Pact from
class and file names since it is no longer valid reference
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
new file mode 100644
index 0000000..3f94109
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -0,0 +1,1499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization;
+import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.reader.MutableReader;
+import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
+import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.chaining.ChainedDriver;
+import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
+import org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator;
+import org.apache.flink.runtime.operators.shipping.OutputCollector;
+import org.apache.flink.runtime.operators.shipping.OutputEmitter;
+import org.apache.flink.runtime.operators.shipping.RecordOutputCollector;
+import org.apache.flink.runtime.operators.shipping.RecordOutputEmitter;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
+import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
+import org.apache.flink.runtime.operators.util.CloseableInputProvider;
+import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.runtime.operators.util.ReaderIterator;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The base class for all batch tasks. Encapsulated common behavior and implements the main life-cycle
+ * of the user code.
+ */
+public class BatchTask<S extends Function, OT> extends AbstractInvokable implements TaskContext<S, OT> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(BatchTask.class);
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * The driver that invokes the user code (the stub implementation). The central driver in this task
+ * (further drivers may be chained behind this driver).
+ */
+ protected volatile Driver<S, OT> driver;
+
+ /**
+ * The instantiated user code of this task's main operator (driver). May be null if the operator has no udf.
+ */
+ protected S stub;
+
+ /**
+ * The udf's runtime context.
+ */
+ protected DistributedRuntimeUDFContext runtimeUdfContext;
+
+ /**
+ * The collector that forwards the user code's results. May forward to a channel or to chained drivers within
+ * this task.
+ */
+ protected Collector<OT> output;
+
+ /**
+ * The output writers for the data that this task forwards to the next task. The latest driver (the central, if no chained
+ * drivers exist, otherwise the last chained driver) produces its output to these writers.
+ */
+ protected List<RecordWriter<?>> eventualOutputs;
+
+ /**
+ * The input readers of this task.
+ */
+ protected MutableReader<?>[] inputReaders;
+
+ /**
+ * The input readers for the configured broadcast variables for this task.
+ */
+ protected MutableReader<?>[] broadcastInputReaders;
+
+ /**
+ * The inputs reader, wrapped in an iterator. Prior to the local strategies, etc...
+ */
+ protected MutableObjectIterator<?>[] inputIterators;
+
+ /**
+ * The indices of the iterative inputs. Empty, if the task is not iterative.
+ */
+ protected int[] iterativeInputs;
+
+ /**
+ * The indices of the iterative broadcast inputs. Empty, if non of the inputs is iteratve.
+ */
+ protected int[] iterativeBroadcastInputs;
+
+ /**
+ * The local strategies that are applied on the inputs.
+ */
+ protected volatile CloseableInputProvider<?>[] localStrategies;
+
+ /**
+ * The optional temp barriers on the inputs for dead-lock breaking. Are
+ * optionally resettable.
+ */
+ protected volatile TempBarrier<?>[] tempBarriers;
+
+ /**
+ * The resettable inputs in the case where no temp barrier is needed.
+ */
+ protected volatile SpillingResettableMutableObjectIterator<?>[] resettableInputs;
+
+ /**
+ * The inputs to the operator. Return the readers' data after the application of the local strategy
+ * and the temp-table barrier.
+ */
+ protected MutableObjectIterator<?>[] inputs;
+
+ /**
+ * The serializers for the input data type.
+ */
+ protected TypeSerializerFactory<?>[] inputSerializers;
+
+ /**
+ * The serializers for the broadcast input data types.
+ */
+ protected TypeSerializerFactory<?>[] broadcastInputSerializers;
+
+ /**
+ * The comparators for the central driver.
+ */
+ protected TypeComparator<?>[] inputComparators;
+
+ /**
+ * The task configuration with the setup parameters.
+ */
+ protected TaskConfig config;
+
+ /**
+ * A list of chained drivers, if there are any.
+ */
+ protected ArrayList<ChainedDriver<?, ?>> chainedTasks;
+
+ /**
+ * Certain inputs may be excluded from resetting. For example, the initial partial solution
+ * in an iteration head must not be reseted (it is read through the back channel), when all
+ * others are reseted.
+ */
+ private boolean[] excludeFromReset;
+
+ /**
+ * Flag indicating for each input whether it is cached and can be reseted.
+ */
+ private boolean[] inputIsCached;
+
+ /**
+ * flag indicating for each input whether it must be asynchronously materialized.
+ */
+ private boolean[] inputIsAsyncMaterialized;
+
+ /**
+ * The amount of memory per input that is dedicated to the materialization.
+ */
+ private int[] materializationMemory;
+
+ /**
+ * The flag that tags the task as still running. Checked periodically to abort processing.
+ */
+ protected volatile boolean running = true;
+
+ /**
+ * The accumulator map used in the RuntimeContext.
+ */
+ protected Map<String, Accumulator<?,?>> accumulatorMap;
+
+ // --------------------------------------------------------------------------------------------
+ // Task Interface
+ // --------------------------------------------------------------------------------------------
+
+
+ /**
+ * Initialization method. Runs in the execution graph setup phase in the JobManager
+ * and as a setup method on the TaskManager.
+ */
+ @Override
+ public void registerInputOutput() throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(formatLogString("Start registering input and output."));
+ }
+
+ // obtain task configuration (including stub parameters)
+ Configuration taskConf = getTaskConfiguration();
+ this.config = new TaskConfig(taskConf);
+
+ // now get the operator class which drives the operation
+ final Class<? extends Driver<S, OT>> driverClass = this.config.getDriver();
+ this.driver = InstantiationUtil.instantiate(driverClass, Driver.class);
+
+ // initialize the readers.
+ // this does not yet trigger any stream consuming or processing.
+ initInputReaders();
+ initBroadcastInputReaders();
+
+ // initialize the writers.
+ initOutputs();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(formatLogString("Finished registering input and output."));
+ }
+ }
+
+
+ /**
+ * The main work method.
+ */
+ @Override
+ public void invoke() throws Exception {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(formatLogString("Start task code."));
+ }
+
+ Environment env = getEnvironment();
+
+ this.runtimeUdfContext = createRuntimeContext(env.getTaskName());
+
+ // whatever happens in this scope, make sure that the local strategies are cleaned up!
+ // note that the initialization of the local strategies is in the try-finally block as well,
+ // so that the thread that creates them catches its own errors that may happen in that process.
+ // this is especially important, since there may be asynchronous closes (such as through canceling).
+ try {
+ // initialize the remaining data structures on the input and trigger the local processing
+ // the local processing includes building the dams / caches
+ try {
+ int numInputs = driver.getNumberOfInputs();
+ int numComparators = driver.getNumberOfDriverComparators();
+ int numBroadcastInputs = this.config.getNumBroadcastInputs();
+
+ initInputsSerializersAndComparators(numInputs, numComparators);
+ initBroadcastInputsSerializers(numBroadcastInputs);
+
+ // set the iterative status for inputs and broadcast inputs
+ {
+ List<Integer> iterativeInputs = new ArrayList<Integer>();
+
+ for (int i = 0; i < numInputs; i++) {
+ final int numberOfEventsUntilInterrupt = getTaskConfig().getNumberOfEventsUntilInterruptInIterativeGate(i);
+
+ if (numberOfEventsUntilInterrupt < 0) {
+ throw new IllegalArgumentException();
+ }
+ else if (numberOfEventsUntilInterrupt > 0) {
+ this.inputReaders[i].setIterativeReader();
+ iterativeInputs.add(i);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(formatLogString("Input [" + i + "] reads in supersteps with [" +
+ + numberOfEventsUntilInterrupt + "] event(s) till next superstep."));
+ }
+ }
+ }
+ this.iterativeInputs = asArray(iterativeInputs);
+ }
+
+ {
+ List<Integer> iterativeBcInputs = new ArrayList<Integer>();
+
+ for (int i = 0; i < numBroadcastInputs; i++) {
+ final int numberOfEventsUntilInterrupt = getTaskConfig().getNumberOfEventsUntilInterruptInIterativeBroadcastGate(i);
+
+ if (numberOfEventsUntilInterrupt < 0) {
+ throw new IllegalArgumentException();
+ }
+ else if (numberOfEventsUntilInterrupt > 0) {
+ this.broadcastInputReaders[i].setIterativeReader();
+ iterativeBcInputs.add(i);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(formatLogString("Broadcast input [" + i + "] reads in supersteps with [" +
+ + numberOfEventsUntilInterrupt + "] event(s) till next superstep."));
+ }
+ }
+ }
+ this.iterativeBroadcastInputs = asArray(iterativeBcInputs);
+ }
+
+ initLocalStrategies(numInputs);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Initializing the input processing failed" +
+ (e.getMessage() == null ? "." : ": " + e.getMessage()), e);
+ }
+
+ if (!this.running) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(formatLogString("Task cancelled before task code was started."));
+ }
+ return;
+ }
+
+ // pre main-function initialization
+ initialize();
+
+ // read the broadcast variables. they will be released in the finally clause
+ for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) {
+ final String name = this.config.getBroadcastInputName(i);
+ readAndSetBroadcastInput(i, name, this.runtimeUdfContext, 1 /* superstep one for the start */);
+ }
+
+ // the work goes here
+ run();
+ }
+ finally {
+ // clean up in any case!
+ closeLocalStrategiesAndCaches();
+
+ clearReaders(inputReaders);
+ clearWriters(eventualOutputs);
+
+ }
+
+ if (this.running) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(formatLogString("Finished task code."));
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(formatLogString("Task code cancelled."));
+ }
+ }
+ }
+
+ @Override
+ public void cancel() throws Exception {
+ this.running = false;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(formatLogString("Cancelling task code"));
+ }
+
+ try {
+ if (this.driver != null) {
+ this.driver.cancel();
+ }
+ } finally {
+ closeLocalStrategiesAndCaches();
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Main Work Methods
+ // --------------------------------------------------------------------------------------------
+
+ protected void initialize() throws Exception {
+ // create the operator
+ try {
+ this.driver.setup(this);
+ }
+ catch (Throwable t) {
+ throw new Exception("The driver setup for '" + this.getEnvironment().getTaskName() +
+ "' , caused an error: " + t.getMessage(), t);
+ }
+
+ // instantiate the UDF
+ try {
+ final Class<? super S> userCodeFunctionType = this.driver.getStubType();
+ // if the class is null, the driver has no user code
+ if (userCodeFunctionType != null) {
+ this.stub = initStub(userCodeFunctionType);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Initializing the UDF" +
+ (e.getMessage() == null ? "." : ": " + e.getMessage()), e);
+ }
+ }
+
+ protected <X> void readAndSetBroadcastInput(int inputNum, String bcVarName, DistributedRuntimeUDFContext context, int superstep) throws IOException {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(formatLogString("Setting broadcast variable '" + bcVarName + "'" +
+ (superstep > 1 ? ", superstep " + superstep : "")));
+ }
+
+ @SuppressWarnings("unchecked")
+ final TypeSerializerFactory<X> serializerFactory = (TypeSerializerFactory<X>) this.broadcastInputSerializers[inputNum];
+
+ final MutableReader<?> reader = this.broadcastInputReaders[inputNum];
+
+ BroadcastVariableMaterialization<X, ?> variable = getEnvironment().getBroadcastVariableManager().materializeBroadcastVariable(bcVarName, superstep, this, reader, serializerFactory);
+ context.setBroadcastVariable(bcVarName, variable);
+ }
+
+ protected void releaseBroadcastVariables(String bcVarName, int superstep, DistributedRuntimeUDFContext context) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(formatLogString("Releasing broadcast variable '" + bcVarName + "'" +
+ (superstep > 1 ? ", superstep " + superstep : "")));
+ }
+
+ getEnvironment().getBroadcastVariableManager().releaseReference(bcVarName, superstep, this);
+ context.clearBroadcastVariable(bcVarName);
+ }
+
+
+ protected void run() throws Exception {
+ // ---------------------------- Now, the actual processing starts ------------------------
+ // check for asynchronous canceling
+ if (!this.running) {
+ return;
+ }
+
+ boolean stubOpen = false;
+
+ try {
+ // run the data preparation
+ try {
+ this.driver.prepare();
+ }
+ catch (Throwable t) {
+ // if the preparation caused an error, clean up
+ // errors during clean-up are swallowed, because we have already a root exception
+ throw new Exception("The data preparation for task '" + this.getEnvironment().getTaskName() +
+ "' , caused an error: " + t.getMessage(), t);
+ }
+
+ // check for canceling
+ if (!this.running) {
+ return;
+ }
+
+ // start all chained tasks
+ BatchTask.openChainedTasks(this.chainedTasks, this);
+
+ // open stub implementation
+ if (this.stub != null) {
+ try {
+ Configuration stubConfig = this.config.getStubParameters();
+ FunctionUtils.openFunction(this.stub, stubConfig);
+ stubOpen = true;
+ }
+ catch (Throwable t) {
+ throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t);
+ }
+ }
+
+ // run the user code
+ this.driver.run();
+
+ // close. We close here such that a regular close throwing an exception marks a task as failed.
+ if (this.running && this.stub != null) {
+ FunctionUtils.closeFunction(this.stub);
+ stubOpen = false;
+ }
+
+ this.output.close();
+
+ // close all chained tasks letting them report failure
+ BatchTask.closeChainedTasks(this.chainedTasks, this);
+ }
+ catch (Exception ex) {
+ // close the input, but do not report any exceptions, since we already have another root cause
+ if (stubOpen) {
+ try {
+ FunctionUtils.closeFunction(this.stub);
+ }
+ catch (Throwable t) {
+ // do nothing
+ }
+ }
+
+ // if resettable driver invoke teardown
+ if (this.driver instanceof ResettableDriver) {
+ final ResettableDriver<?, ?> resDriver = (ResettableDriver<?, ?>) this.driver;
+ try {
+ resDriver.teardown();
+ } catch (Throwable t) {
+ throw new Exception("Error while shutting down an iterative operator: " + t.getMessage(), t);
+ }
+ }
+
+ BatchTask.cancelChainedTasks(this.chainedTasks);
+
+ ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
+
+ if (ex instanceof CancelTaskException) {
+ // forward canceling exception
+ throw ex;
+ }
+ else if (this.running) {
+ // throw only if task was not cancelled. in the case of canceling, exceptions are expected
+ BatchTask.logAndThrowException(ex, this);
+ }
+ }
+ finally {
+ this.driver.cleanup();
+ }
+ }
+
+ protected void closeLocalStrategiesAndCaches() {
+
+ // make sure that all broadcast variable references held by this task are released
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(formatLogString("Releasing all broadcast variables."));
+ }
+
+ getEnvironment().getBroadcastVariableManager().releaseAllReferencesFromTask(this);
+ if (runtimeUdfContext != null) {
+ runtimeUdfContext.clearAllBroadcastVariables();
+ }
+
+ // clean all local strategies and caches/pipeline breakers.
+
+ if (this.localStrategies != null) {
+ for (int i = 0; i < this.localStrategies.length; i++) {
+ if (this.localStrategies[i] != null) {
+ try {
+ this.localStrategies[i].close();
+ } catch (Throwable t) {
+ LOG.error("Error closing local strategy for input " + i, t);
+ }
+ }
+ }
+ }
+ if (this.tempBarriers != null) {
+ for (int i = 0; i < this.tempBarriers.length; i++) {
+ if (this.tempBarriers[i] != null) {
+ try {
+ this.tempBarriers[i].close();
+ } catch (Throwable t) {
+ LOG.error("Error closing temp barrier for input " + i, t);
+ }
+ }
+ }
+ }
+ if (this.resettableInputs != null) {
+ for (int i = 0; i < this.resettableInputs.length; i++) {
+ if (this.resettableInputs[i] != null) {
+ try {
+ this.resettableInputs[i].close();
+ } catch (Throwable t) {
+ LOG.error("Error closing cache for input " + i, t);
+ }
+ }
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Task Setup and Teardown
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * @return the last output collector in the collector chain
+ */
+ @SuppressWarnings("unchecked")
+ protected Collector<OT> getLastOutputCollector() {
+ int numChained = this.chainedTasks.size();
+ return (numChained == 0) ? output : (Collector<OT>) chainedTasks.get(numChained - 1).getOutputCollector();
+ }
+
+ /**
+ * Sets the last output {@link Collector} of the collector chain of this {@link BatchTask}.
+ * <p>
+ * In case of chained tasks, the output collector of the last {@link ChainedDriver} is set. Otherwise it is the
+ * single collector of the {@link BatchTask}.
+ *
+ * @param newOutputCollector new output collector to set as last collector
+ */
+ protected void setLastOutputCollector(Collector<OT> newOutputCollector) {
+ int numChained = this.chainedTasks.size();
+
+ if (numChained == 0) {
+ output = newOutputCollector;
+ return;
+ }
+
+ chainedTasks.get(numChained - 1).setOutputCollector(newOutputCollector);
+ }
+
+ public TaskConfig getLastTasksConfig() {
+ int numChained = this.chainedTasks.size();
+ return (numChained == 0) ? config : chainedTasks.get(numChained - 1).getTaskConfig();
+ }
+
+ protected S initStub(Class<? super S> stubSuperClass) throws Exception {
+ try {
+ ClassLoader userCodeClassLoader = getUserCodeClassLoader();
+ S stub = config.<S>getStubWrapper(userCodeClassLoader).getUserCodeObject(stubSuperClass, userCodeClassLoader);
+ // check if the class is a subclass, if the check is required
+ if (stubSuperClass != null && !stubSuperClass.isAssignableFrom(stub.getClass())) {
+ throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" +
+ stubSuperClass.getName() + "' as is required.");
+ }
+ FunctionUtils.setFunctionRuntimeContext(stub, this.runtimeUdfContext);
+ return stub;
+ }
+ catch (ClassCastException ccex) {
+ throw new Exception("The stub class is not a proper subclass of " + stubSuperClass.getName(), ccex);
+ }
+ }
+
+ /**
+ * Creates the record readers for the number of inputs as defined by {@link #getNumTaskInputs()}.
+ *
+ * This method requires that the task configuration, the driver, and the user-code class loader are set.
+ */
+ protected void initInputReaders() throws Exception {
+ final int numInputs = getNumTaskInputs();
+ final MutableReader<?>[] inputReaders = new MutableReader<?>[numInputs];
+
+ int currentReaderOffset = 0;
+
+ AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+ AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+
+ for (int i = 0; i < numInputs; i++) {
+ // ---------------- create the input readers ---------------------
+ // in case where a logical input unions multiple physical inputs, create a union reader
+ final int groupSize = this.config.getGroupSize(i);
+
+ if (groupSize == 1) {
+ // non-union case
+ inputReaders[i] = new MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset));
+ } else if (groupSize > 1){
+ // union case
+ InputGate[] readers = new InputGate[groupSize];
+ for (int j = 0; j < groupSize; ++j) {
+ readers[j] = getEnvironment().getInputGate(currentReaderOffset + j);
+ }
+ inputReaders[i] = new MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers));
+ } else {
+ throw new Exception("Illegal input group size in task configuration: " + groupSize);
+ }
+
+ inputReaders[i].setReporter(reporter);
+
+ currentReaderOffset += groupSize;
+ }
+ this.inputReaders = inputReaders;
+
+ // final sanity check
+ if (currentReaderOffset != this.config.getNumInputs()) {
+ throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
+ }
+ }
+
+ /**
+ * Creates the record readers for the extra broadcast inputs as configured by {@link TaskConfig#getNumBroadcastInputs()}.
+ *
+ * This method requires that the task configuration, the driver, and the user-code class loader are set.
+ */
+ protected void initBroadcastInputReaders() throws Exception {
+ final int numBroadcastInputs = this.config.getNumBroadcastInputs();
+ final MutableReader<?>[] broadcastInputReaders = new MutableReader<?>[numBroadcastInputs];
+
+ int currentReaderOffset = config.getNumInputs();
+
+ for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) {
+ // ---------------- create the input readers ---------------------
+ // in case where a logical input unions multiple physical inputs, create a union reader
+ final int groupSize = this.config.getBroadcastGroupSize(i);
+ if (groupSize == 1) {
+ // non-union case
+ broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset));
+ } else if (groupSize > 1){
+ // union case
+ InputGate[] readers = new InputGate[groupSize];
+ for (int j = 0; j < groupSize; ++j) {
+ readers[j] = getEnvironment().getInputGate(currentReaderOffset + j);
+ }
+ broadcastInputReaders[i] = new MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers));
+ } else {
+ throw new Exception("Illegal input group size in task configuration: " + groupSize);
+ }
+
+ currentReaderOffset += groupSize;
+ }
+ this.broadcastInputReaders = broadcastInputReaders;
+ }
+
+ /**
+ * Creates all the serializers and comparators.
+ */
+ protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception {
+ this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
+ this.inputComparators = numComparators > 0 ? new TypeComparator<?>[numComparators] : null;
+ this.inputIterators = new MutableObjectIterator<?>[numInputs];
+
+ ClassLoader userCodeClassLoader = getUserCodeClassLoader();
+
+ for (int i = 0; i < numInputs; i++) {
+
+ final TypeSerializerFactory<?> serializerFactory = this.config.getInputSerializer(i, userCodeClassLoader);
+ this.inputSerializers[i] = serializerFactory;
+
+ this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
+ }
+
+ // ---------------- create the driver's comparators ---------------------
+ for (int i = 0; i < numComparators; i++) {
+
+ if (this.inputComparators != null) {
+ final TypeComparatorFactory<?> comparatorFactory = this.config.getDriverComparator(i, userCodeClassLoader);
+ this.inputComparators[i] = comparatorFactory.createComparator();
+ }
+ }
+ }
+
+ /**
+ * Creates all the serializers and iterators for the broadcast inputs.
+ */
+ protected void initBroadcastInputsSerializers(int numBroadcastInputs) throws Exception {
+ this.broadcastInputSerializers = new TypeSerializerFactory<?>[numBroadcastInputs];
+
+ ClassLoader userCodeClassLoader = getUserCodeClassLoader();
+
+ for (int i = 0; i < numBroadcastInputs; i++) {
+ // ---------------- create the serializer first ---------------------
+ final TypeSerializerFactory<?> serializerFactory = this.config.getBroadcastInputSerializer(i, userCodeClassLoader);
+ this.broadcastInputSerializers[i] = serializerFactory;
+ }
+ }
+
+ /**
+ *
+ * NOTE: This method must be invoked after the invocation of {@code #initInputReaders()} and
+ * {@code #initInputSerializersAndComparators(int)}!
+ *
+ * @param numInputs
+ */
+ protected void initLocalStrategies(int numInputs) throws Exception {
+
+ final MemoryManager memMan = getMemoryManager();
+ final IOManager ioMan = getIOManager();
+
+ this.localStrategies = new CloseableInputProvider<?>[numInputs];
+ this.inputs = new MutableObjectIterator<?>[numInputs];
+ this.excludeFromReset = new boolean[numInputs];
+ this.inputIsCached = new boolean[numInputs];
+ this.inputIsAsyncMaterialized = new boolean[numInputs];
+ this.materializationMemory = new int[numInputs];
+
+ // set up the local strategies first, such that the can work before any temp barrier is created
+ for (int i = 0; i < numInputs; i++) {
+ initInputLocalStrategy(i);
+ }
+
+ // we do another loop over the inputs, because we want to instantiate all
+ // sorters, etc before requesting the first input (as this call may block)
+
+ // we have two types of materialized inputs, and both are replayable (can act as a cache)
+ // The first variant materializes in a different thread and hence
+ // acts as a pipeline breaker. this one should only be there, if a pipeline breaker is needed.
+ // the second variant spills to the side and will not read unless the result is also consumed
+ // in a pipelined fashion.
+ this.resettableInputs = new SpillingResettableMutableObjectIterator<?>[numInputs];
+ this.tempBarriers = new TempBarrier<?>[numInputs];
+
+ for (int i = 0; i < numInputs; i++) {
+ final int memoryPages;
+ final boolean async = this.config.isInputAsynchronouslyMaterialized(i);
+ final boolean cached = this.config.isInputCached(i);
+
+ this.inputIsAsyncMaterialized[i] = async;
+ this.inputIsCached[i] = cached;
+
+ if (async || cached) {
+ memoryPages = memMan.computeNumberOfPages(this.config.getRelativeInputMaterializationMemory(i));
+ if (memoryPages <= 0) {
+ throw new Exception("Input marked as materialized/cached, but no memory for materialization provided.");
+ }
+ this.materializationMemory[i] = memoryPages;
+ } else {
+ memoryPages = 0;
+ }
+
+ if (async) {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ TempBarrier<?> barrier = new TempBarrier(this, getInput(i), this.inputSerializers[i], memMan, ioMan, memoryPages);
+ barrier.startReading();
+ this.tempBarriers[i] = barrier;
+ this.inputs[i] = null;
+ } else if (cached) {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ SpillingResettableMutableObjectIterator<?> iter = new SpillingResettableMutableObjectIterator(
+ getInput(i), this.inputSerializers[i].getSerializer(), getMemoryManager(), getIOManager(), memoryPages, this);
+ this.resettableInputs[i] = iter;
+ this.inputs[i] = iter;
+ }
+ }
+ }
+
+ protected void resetAllInputs() throws Exception {
+
+ // first we need to make sure that caches consume remaining data
+ // NOTE: we need to do this before closing the local strategies
+ for (int i = 0; i < this.inputs.length; i++) {
+
+ if (this.inputIsCached[i] && this.resettableInputs[i] != null) {
+ this.resettableInputs[i].consumeAndCacheRemainingData();
+ }
+ }
+
+ // close all local-strategies. they will either get re-initialized, or we have
+ // read them now and their data is cached
+ for (int i = 0; i < this.localStrategies.length; i++) {
+ if (this.localStrategies[i] != null) {
+ this.localStrategies[i].close();
+ this.localStrategies[i] = null;
+ }
+ }
+
+ final MemoryManager memMan = getMemoryManager();
+ final IOManager ioMan = getIOManager();
+
+ // reset the caches, or re-run the input local strategy
+ for (int i = 0; i < this.inputs.length; i++) {
+ if (this.excludeFromReset[i]) {
+ if (this.tempBarriers[i] != null) {
+ this.tempBarriers[i].close();
+ this.tempBarriers[i] = null;
+ } else if (this.resettableInputs[i] != null) {
+ this.resettableInputs[i].close();
+ this.resettableInputs[i] = null;
+ }
+ } else {
+ // make sure the input is not available directly, but are lazily fetched again
+ this.inputs[i] = null;
+
+ if (this.inputIsCached[i]) {
+ if (this.tempBarriers[i] != null) {
+ this.inputs[i] = this.tempBarriers[i].getIterator();
+ } else if (this.resettableInputs[i] != null) {
+ this.resettableInputs[i].reset();
+ this.inputs[i] = this.resettableInputs[i];
+ } else {
+ throw new RuntimeException("Found a resettable input, but no temp barrier and no resettable iterator.");
+ }
+ } else {
+ // close the async barrier if there is one
+ if (this.tempBarriers[i] != null) {
+ this.tempBarriers[i].close();
+ }
+
+ // recreate the local strategy
+ initInputLocalStrategy(i);
+
+ if (this.inputIsAsyncMaterialized[i]) {
+ final int pages = this.materializationMemory[i];
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ TempBarrier<?> barrier = new TempBarrier(this, getInput(i), this.inputSerializers[i], memMan, ioMan, pages);
+ barrier.startReading();
+ this.tempBarriers[i] = barrier;
+ this.inputs[i] = null;
+ }
+ }
+ }
+ }
+ }
+
+ protected void excludeFromReset(int inputNum) {
+ this.excludeFromReset[inputNum] = true;
+ }
+
+ private void initInputLocalStrategy(int inputNum) throws Exception {
+ // check if there is already a strategy
+ if (this.localStrategies[inputNum] != null) {
+ throw new IllegalStateException();
+ }
+
+ // now set up the local strategy
+ final LocalStrategy localStrategy = this.config.getInputLocalStrategy(inputNum);
+ if (localStrategy != null) {
+ switch (localStrategy) {
+ case NONE:
+ // the input is as it is
+ this.inputs[inputNum] = this.inputIterators[inputNum];
+ break;
+ case SORT:
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(),
+ this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
+ this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
+ this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled());
+ // set the input to null such that it will be lazily fetched from the input strategy
+ this.inputs[inputNum] = null;
+ this.localStrategies[inputNum] = sorter;
+ break;
+ case COMBININGSORT:
+ // sanity check this special case!
+ // this still breaks a bit of the abstraction!
+ // we should have nested configurations for the local strategies to solve that
+ if (inputNum != 0) {
+ throw new IllegalStateException("Performing combining sort outside a (group)reduce task!");
+ }
+
+ // instantiate ourselves a combiner. we should not use the stub, because the sort and the
+ // subsequent (group)reduce would otherwise share it multi-threaded
+ final Class<S> userCodeFunctionType = this.driver.getStubType();
+ if (userCodeFunctionType == null) {
+ throw new IllegalStateException("Performing combining sort outside a reduce task!");
+ }
+ final S localStub;
+ try {
+ localStub = initStub(userCodeFunctionType);
+ } catch (Exception e) {
+ throw new RuntimeException("Initializing the user code and the configuration failed" +
+ (e.getMessage() == null ? "." : ": " + e.getMessage()), e);
+ }
+
+ if (!(localStub instanceof GroupCombineFunction)) {
+ throw new IllegalStateException("Performing combining sort outside a reduce task!");
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger(
+ (GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
+ this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
+ this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
+ this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled());
+ cSorter.setUdfConfiguration(this.config.getStubParameters());
+
+ // set the input to null such that it will be lazily fetched from the input strategy
+ this.inputs[inputNum] = null;
+ this.localStrategies[inputNum] = cSorter;
+ break;
+ default:
+ throw new Exception("Unrecognized local strategy provided: " + localStrategy.name());
+ }
+ } else {
+ // no local strategy in the config
+ this.inputs[inputNum] = this.inputIterators[inputNum];
+ }
+ }
+
+ private <T> TypeComparator<T> getLocalStrategyComparator(int inputNum) throws Exception {
+ TypeComparatorFactory<T> compFact = this.config.getInputComparator(inputNum, getUserCodeClassLoader());
+ if (compFact == null) {
+ throw new Exception("Missing comparator factory for local strategy on input " + inputNum);
+ }
+ return compFact.createComparator();
+ }
+
+ protected MutableObjectIterator<?> createInputIterator(MutableReader<?> inputReader, TypeSerializerFactory<?> serializerFactory) {
+ @SuppressWarnings("unchecked")
+ MutableReader<DeserializationDelegate<?>> reader = (MutableReader<DeserializationDelegate<?>>) inputReader;
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ final MutableObjectIterator<?> iter = new ReaderIterator(reader, serializerFactory.getSerializer());
+ return iter;
+ }
+
+ protected int getNumTaskInputs() {
+ return this.driver.getNumberOfInputs();
+ }
+
+ /**
+ * Creates a writer for each output. Creates an OutputCollector which forwards its input to all writers.
+ * The output collector applies the configured shipping strategies for each writer.
+ */
+ protected void initOutputs() throws Exception {
+ this.chainedTasks = new ArrayList<ChainedDriver<?, ?>>();
+ this.eventualOutputs = new ArrayList<RecordWriter<?>>();
+
+ ClassLoader userCodeClassLoader = getUserCodeClassLoader();
+
+ AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
+ AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
+
+ this.accumulatorMap = accumulatorRegistry.getUserMap();
+
+ this.output = initOutputs(this, userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs,
+ this.getExecutionConfig(), reporter, this.accumulatorMap);
+ }
+
+ public DistributedRuntimeUDFContext createRuntimeContext(String taskName) {
+ Environment env = getEnvironment();
+
+ return new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(),
+ env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(),
+ env.getDistributedCacheEntries(), this.accumulatorMap);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Task Context Signature
+ // -------------------------------------------------------------------------------------------
+
+ @Override
+ public TaskConfig getTaskConfig() {
+ return this.config;
+ }
+
+ @Override
+ public TaskManagerRuntimeInfo getTaskManagerInfo() {
+ return getEnvironment().getTaskManagerInfo();
+ }
+
+ @Override
+ public MemoryManager getMemoryManager() {
+ return getEnvironment().getMemoryManager();
+ }
+
+ @Override
+ public IOManager getIOManager() {
+ return getEnvironment().getIOManager();
+ }
+
+ @Override
+ public S getStub() {
+ return this.stub;
+ }
+
+ @Override
+ public Collector<OT> getOutputCollector() {
+ return this.output;
+ }
+
+ @Override
+ public AbstractInvokable getOwningNepheleTask() {
+ return this;
+ }
+
+ @Override
+ public String formatLogString(String message) {
+ return constructLogString(message, getEnvironment().getTaskName(), this);
+ }
+
+ @Override
+ public <X> MutableObjectIterator<X> getInput(int index) {
+ if (index < 0 || index > this.driver.getNumberOfInputs()) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ // check for lazy assignment from input strategies
+ if (this.inputs[index] != null) {
+ @SuppressWarnings("unchecked")
+ MutableObjectIterator<X> in = (MutableObjectIterator<X>) this.inputs[index];
+ return in;
+ } else {
+ final MutableObjectIterator<X> in;
+ try {
+ if (this.tempBarriers[index] != null) {
+ @SuppressWarnings("unchecked")
+ MutableObjectIterator<X> iter = (MutableObjectIterator<X>) this.tempBarriers[index].getIterator();
+ in = iter;
+ } else if (this.localStrategies[index] != null) {
+ @SuppressWarnings("unchecked")
+ MutableObjectIterator<X> iter = (MutableObjectIterator<X>) this.localStrategies[index].getIterator();
+ in = iter;
+ } else {
+ throw new RuntimeException("Bug: null input iterator, null temp barrier, and null local strategy.");
+ }
+ this.inputs[index] = in;
+ return in;
+ } catch (InterruptedException iex) {
+ throw new RuntimeException("Interrupted while waiting for input " + index + " to become available.");
+ } catch (IOException ioex) {
+ throw new RuntimeException("An I/O Exception occurred while obtaining input " + index + ".");
+ }
+ }
+ }
+
+
+ @Override
+ public <X> TypeSerializerFactory<X> getInputSerializer(int index) {
+ if (index < 0 || index >= this.driver.getNumberOfInputs()) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ @SuppressWarnings("unchecked")
+ final TypeSerializerFactory<X> serializerFactory = (TypeSerializerFactory<X>) this.inputSerializers[index];
+ return serializerFactory;
+ }
+
+
+ @Override
+ public <X> TypeComparator<X> getDriverComparator(int index) {
+ if (this.inputComparators == null) {
+ throw new IllegalStateException("Comparators have not been created!");
+ }
+ else if (index < 0 || index >= this.driver.getNumberOfDriverComparators()) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ @SuppressWarnings("unchecked")
+ final TypeComparator<X> comparator = (TypeComparator<X>) this.inputComparators[index];
+ return comparator;
+ }
+
+ // ============================================================================================
+ // Static Utilities
+ //
+ // Utilities are consolidated here to ensure a uniform way of running,
+ // logging, exception handling, and error messages.
+ // ============================================================================================
+
+ // --------------------------------------------------------------------------------------------
+ // Logging
+ // --------------------------------------------------------------------------------------------
+ /**
+ * Utility function that composes a string for logging purposes. The string includes the given message,
+ * the given name of the task and the index in its subtask group as well as the number of instances
+ * that exist in its subtask group.
+ *
+ * @param message The main message for the log.
+ * @param taskName The name of the task.
+ * @param parent The nephele task that contains the code producing the message.
+ *
+ * @return The string for logging.
+ */
+ public static String constructLogString(String message, String taskName, AbstractInvokable parent) {
+ return message + ": " + taskName + " (" + (parent.getEnvironment().getIndexInSubtaskGroup() + 1) +
+ '/' + parent.getEnvironment().getNumberOfSubtasks() + ')';
+ }
+
+ /**
+ * Prints an error message and throws the given exception. If the exception is of the type
+ * {@link ExceptionInChainedStubException} then the chain of contained exceptions is followed
+ * until an exception of a different type is found.
+ *
+ * @param ex The exception to be thrown.
+ * @param parent The parent task, whose information is included in the log message.
+ * @throws Exception Always thrown.
+ */
+ public static void logAndThrowException(Exception ex, AbstractInvokable parent) throws Exception {
+ String taskName;
+ if (ex instanceof ExceptionInChainedStubException) {
+ do {
+ ExceptionInChainedStubException cex = (ExceptionInChainedStubException) ex;
+ taskName = cex.getTaskName();
+ ex = cex.getWrappedException();
+ } while (ex instanceof ExceptionInChainedStubException);
+ } else {
+ taskName = parent.getEnvironment().getTaskName();
+ }
+
+ if (LOG.isErrorEnabled()) {
+ LOG.error(constructLogString("Error in task code", taskName, parent), ex);
+ }
+
+ throw ex;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Result Shipping and Chained Tasks
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Creates the {@link Collector} for the given task, as described by the given configuration. The
+ * output collector contains the writers that forward the data to the different tasks that the given task
+ * is connected to. Each writer applies a the partitioning as described in the configuration.
+ *
+ * @param task The task that the output collector is created for.
+ * @param config The configuration describing the output shipping strategies.
+ * @param cl The classloader used to load user defined types.
+ * @param eventualOutputs The output writers that this task forwards to the next task for each output.
+ * @param outputOffset The offset to start to get the writers for the outputs
+ * @param numOutputs The number of outputs described in the configuration.
+ *
+ * @return The OutputCollector that data produced in this task is submitted to.
+ */
+ public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl,
+ List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs, AccumulatorRegistry.Reporter reporter) throws Exception
+ {
+ if (numOutputs == 0) {
+ return null;
+ }
+
+ // get the factory for the serializer
+ final TypeSerializerFactory<T> serializerFactory = config.getOutputSerializer(cl);
+
+ // special case the Record
+ if (serializerFactory.getDataType().equals(Record.class)) {
+ final List<RecordWriter<Record>> writers = new ArrayList<RecordWriter<Record>>(numOutputs);
+
+ // create a writer for each output
+ for (int i = 0; i < numOutputs; i++) {
+ // create the OutputEmitter from output ship strategy
+ final ShipStrategyType strategy = config.getOutputShipStrategy(i);
+ final TypeComparatorFactory<?> compFact = config.getOutputComparator(i, cl);
+ final RecordOutputEmitter oe;
+ if (compFact == null) {
+ oe = new RecordOutputEmitter(strategy);
+ } else {
+ @SuppressWarnings("unchecked")
+ TypeComparator<Record> comparator = (TypeComparator<Record>) compFact.createComparator();
+ if (!comparator.supportsCompareAgainstReference()) {
+ throw new Exception("Incompatibe serializer-/comparator factories.");
+ }
+ final DataDistribution distribution = config.getOutputDataDistribution(i, cl);
+ final Partitioner<?> partitioner = config.getOutputPartitioner(i, cl);
+
+ oe = new RecordOutputEmitter(strategy, comparator, partitioner, distribution);
+ }
+
+ // setup accumulator counters
+ final RecordWriter<Record> recordWriter = new RecordWriter<Record>(task.getEnvironment().getWriter(outputOffset + i), oe);
+ recordWriter.setReporter(reporter);
+
+ writers.add(recordWriter);
+ }
+ if (eventualOutputs != null) {
+ eventualOutputs.addAll(writers);
+ }
+
+ @SuppressWarnings("unchecked")
+ final Collector<T> outColl = (Collector<T>) new RecordOutputCollector(writers);
+ return outColl;
+ }
+ else {
+ // generic case
+ final List<RecordWriter<SerializationDelegate<T>>> writers = new ArrayList<RecordWriter<SerializationDelegate<T>>>(numOutputs);
+
+ // create a writer for each output
+ for (int i = 0; i < numOutputs; i++)
+ {
+ // create the OutputEmitter from output ship strategy
+ final ShipStrategyType strategy = config.getOutputShipStrategy(i);
+ final TypeComparatorFactory<T> compFactory = config.getOutputComparator(i, cl);
+
+ final ChannelSelector<SerializationDelegate<T>> oe;
+ if (compFactory == null) {
+ oe = new OutputEmitter<T>(strategy);
+ }
+ else {
+ final DataDistribution dataDist = config.getOutputDataDistribution(i, cl);
+ final Partitioner<?> partitioner = config.getOutputPartitioner(i, cl);
+
+ final TypeComparator<T> comparator = compFactory.createComparator();
+ oe = new OutputEmitter<T>(strategy, comparator, partitioner, dataDist);
+ }
+
+ final RecordWriter<SerializationDelegate<T>> recordWriter =
+ new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe);
+
+ // setup live accumulator counters
+ recordWriter.setReporter(reporter);
+
+ writers.add(recordWriter);
+ }
+ if (eventualOutputs != null) {
+ eventualOutputs.addAll(writers);
+ }
+ return new OutputCollector<T>(writers, serializerFactory.getSerializer());
+ }
+ }
+
+ /**
+ * Creates a writer for each output. Creates an OutputCollector which forwards its input to all writers.
+ * The output collector applies the configured shipping strategy.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> Collector<T> initOutputs(AbstractInvokable nepheleTask, ClassLoader cl, TaskConfig config,
+ List<ChainedDriver<?, ?>> chainedTasksTarget,
+ List<RecordWriter<?>> eventualOutputs,
+ ExecutionConfig executionConfig,
+ AccumulatorRegistry.Reporter reporter,
+ Map<String, Accumulator<?,?>> accumulatorMap)
+ throws Exception
+ {
+ final int numOutputs = config.getNumOutputs();
+
+ // check whether we got any chained tasks
+ final int numChained = config.getNumberOfChainedStubs();
+ if (numChained > 0) {
+ // got chained stubs. that means that this one may only have a single forward connection
+ if (numOutputs != 1 || config.getOutputShipStrategy(0) != ShipStrategyType.FORWARD) {
+ throw new RuntimeException("Plan Generation Bug: Found a chained stub that is not connected via an only forward connection.");
+ }
+
+ // instantiate each task
+ @SuppressWarnings("rawtypes")
+ Collector previous = null;
+ for (int i = numChained - 1; i >= 0; --i)
+ {
+ // get the task first
+ final ChainedDriver<?, ?> ct;
+ try {
+ Class<? extends ChainedDriver<?, ?>> ctc = config.getChainedTask(i);
+ ct = ctc.newInstance();
+ }
+ catch (Exception ex) {
+ throw new RuntimeException("Could not instantiate chained task driver.", ex);
+ }
+
+ // get the configuration for the task
+ final TaskConfig chainedStubConf = config.getChainedStubConfig(i);
+ final String taskName = config.getChainedTaskName(i);
+
+ if (i == numChained - 1) {
+ // last in chain, instantiate the output collector for this task
+ previous = getOutputCollector(nepheleTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs(), reporter);
+ }
+
+ ct.setup(chainedStubConf, taskName, previous, nepheleTask, cl, executionConfig, accumulatorMap);
+ chainedTasksTarget.add(0, ct);
+
+ previous = ct;
+ }
+ // the collector of the first in the chain is the collector for the nephele task
+ return (Collector<T>) previous;
+ }
+ // else
+
+ // instantiate the output collector the default way from this configuration
+ return getOutputCollector(nepheleTask , config, cl, eventualOutputs, 0, numOutputs, reporter);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // User Code LifeCycle
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Opens the given stub using its {@link org.apache.flink.api.common.functions.RichFunction#open(Configuration)} method. If the open call produces
+ * an exception, a new exception with a standard error message is created, using the encountered exception
+ * as its cause.
+ *
+ * @param stub The user code instance to be opened.
+ * @param parameters The parameters supplied to the user code.
+ *
+ * @throws Exception Thrown, if the user code's open method produces an exception.
+ */
+ public static void openUserCode(Function stub, Configuration parameters) throws Exception {
+ try {
+ FunctionUtils.openFunction(stub, parameters);
+ } catch (Throwable t) {
+ throw new Exception("The user defined 'open(Configuration)' method in " + stub.getClass().toString() + " caused an exception: " + t.getMessage(), t);
+ }
+ }
+
+ /**
+ * Closes the given stub using its {@link org.apache.flink.api.common.functions.RichFunction#close()} method. If the close call produces
+ * an exception, a new exception with a standard error message is created, using the encountered exception
+ * as its cause.
+ *
+ * @param stub The user code instance to be closed.
+ *
+ * @throws Exception Thrown, if the user code's close method produces an exception.
+ */
+ public static void closeUserCode(Function stub) throws Exception {
+ try {
+ FunctionUtils.closeFunction(stub);
+ } catch (Throwable t) {
+ throw new Exception("The user defined 'close()' method caused an exception: " + t.getMessage(), t);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Chained Task LifeCycle
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Opens all chained tasks, in the order as they are stored in the array. The opening process
+ * creates a standardized log info message.
+ *
+ * @param tasks The tasks to be opened.
+ * @param parent The parent task, used to obtain parameters to include in the log message.
+ * @throws Exception Thrown, if the opening encounters an exception.
+ */
+ public static void openChainedTasks(List<ChainedDriver<?, ?>> tasks, AbstractInvokable parent) throws Exception {
+ // start all chained tasks
+ for (int i = 0; i < tasks.size(); i++) {
+ final ChainedDriver<?, ?> task = tasks.get(i);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(constructLogString("Start task code", task.getTaskName(), parent));
+ }
+ task.openTask();
+ }
+ }
+
+ /**
+ * Closes all chained tasks, in the order as they are stored in the array. The closing process
+ * creates a standardized log info message.
+ *
+ * @param tasks The tasks to be closed.
+ * @param parent The parent task, used to obtain parameters to include in the log message.
+ * @throws Exception Thrown, if the closing encounters an exception.
+ */
+ public static void closeChainedTasks(List<ChainedDriver<?, ?>> tasks, AbstractInvokable parent) throws Exception {
+ for (int i = 0; i < tasks.size(); i++) {
+ final ChainedDriver<?, ?> task = tasks.get(i);
+ task.closeTask();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(constructLogString("Finished task code", task.getTaskName(), parent));
+ }
+ }
+ }
+
+ /**
+ * Cancels all tasks via their {@link ChainedDriver#cancelTask()} method. Any occurring exception
+ * and error is suppressed, such that the canceling method of every task is invoked in all cases.
+ *
+ * @param tasks The tasks to be canceled.
+ */
+ public static void cancelChainedTasks(List<ChainedDriver<?, ?>> tasks) {
+ for (int i = 0; i < tasks.size(); i++) {
+ try {
+ tasks.get(i).cancelTask();
+ } catch (Throwable t) {
+ // do nothing
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Miscellaneous Utilities
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Instantiates a user code class from is definition in the task configuration.
+ * The class is instantiated without arguments using the null-ary constructor. Instantiation
+ * will fail if this constructor does not exist or is not public.
+ *
+ * @param <T> The generic type of the user code class.
+ * @param config The task configuration containing the class description.
+ * @param cl The class loader to be used to load the class.
+ * @param superClass The super class that the user code class extends or implements, for type checking.
+ *
+ * @return An instance of the user code class.
+ */
+ public static <T> T instantiateUserCode(TaskConfig config, ClassLoader cl, Class<? super T> superClass) {
+ try {
+ T stub = config.<T>getStubWrapper(cl).getUserCodeObject(superClass, cl);
+ // check if the class is a subclass, if the check is required
+ if (superClass != null && !superClass.isAssignableFrom(stub.getClass())) {
+ throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" +
+ superClass.getName() + "' as is required.");
+ }
+ return stub;
+ }
+ catch (ClassCastException ccex) {
+ throw new RuntimeException("The UDF class is not a proper subclass of " + superClass.getName(), ccex);
+ }
+ }
+
+ private static int[] asArray(List<Integer> list) {
+ int[] a = new int[list.size()];
+
+ int i = 0;
+ for (int val : list) {
+ a[i++] = val;
+ }
+ return a;
+ }
+
+ public static void clearWriters(List<RecordWriter<?>> writers) {
+ for (RecordWriter<?> writer : writers) {
+ writer.clearBuffers();
+ }
+ }
+
+ public static void clearReaders(MutableReader<?>[] readers) {
+ for (MutableReader<?> reader : readers) {
+ reader.clearBuffers();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
index dfc4e91..665ab0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
@@ -43,12 +43,12 @@ import org.apache.flink.util.MutableObjectIterator;
*
* @see org.apache.flink.api.common.functions.CoGroupFunction
*/
-public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
+public class CoGroupDriver<IT1, IT2, OT> implements Driver<CoGroupFunction<IT1, IT2, OT>, OT> {
private static final Logger LOG = LoggerFactory.getLogger(CoGroupDriver.class);
- private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
+ private TaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
private CoGroupTaskIterator<IT1, IT2> coGroupIterator; // the iterator that does the actual cogroup
@@ -60,7 +60,7 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<CoGroupFunction<I
@Override
- public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) {
+ public void setup(TaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupRawDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupRawDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupRawDriver.java
index 7abad5d..0487904 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupRawDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupRawDriver.java
@@ -26,17 +26,17 @@ import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class CoGroupRawDriver<IT1, IT2, OT> implements PactDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
+public class CoGroupRawDriver<IT1, IT2, OT> implements Driver<CoGroupFunction<IT1, IT2, OT>, OT> {
private static final Logger LOG = LoggerFactory.getLogger(CoGroupRawDriver.class);
- private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
+ private TaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
private SimpleIterable<IT1> coGroupIterator1;
private SimpleIterable<IT2> coGroupIterator2;
@Override
- public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) {
+ public void setup(TaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) {
this.taskContext = context;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
index 97d6e51..07fb352 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
-import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask;
+import org.apache.flink.runtime.iterative.task.AbstractIterativeTask;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
@@ -37,9 +37,9 @@ import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.runtime.util.SingleElementIterator;
import org.apache.flink.util.Collector;
-public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettablePactDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
+public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
- private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
+ private TaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
private CompactingHashTable<IT1> hashTable;
@@ -63,7 +63,7 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
// --------------------------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) {
+ public void setup(TaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
@@ -106,8 +106,8 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
final TypeComparator<IT1> solutionSetComparator;
// grab a handle to the hash table from the iteration broker
- if (taskContext instanceof AbstractIterativePactTask) {
- AbstractIterativePactTask<?, ?> iterativeTaskContext = (AbstractIterativePactTask<?, ?>) taskContext;
+ if (taskContext instanceof AbstractIterativeTask) {
+ AbstractIterativeTask<?, ?> iterativeTaskContext = (AbstractIterativeTask<?, ?>) taskContext;
String identifier = iterativeTaskContext.brokerKey();
Object table = SolutionSetBroker.instance().get(identifier);
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
index 9e8a81c..eec6597 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
-import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask;
+import org.apache.flink.runtime.iterative.task.AbstractIterativeTask;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.util.EmptyIterator;
@@ -36,9 +36,9 @@ import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.runtime.util.SingleElementIterator;
import org.apache.flink.util.Collector;
-public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettablePactDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
+public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettableDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
- private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
+ private TaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
private CompactingHashTable<IT2> hashTable;
@@ -61,7 +61,7 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
// --------------------------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) {
+ public void setup(TaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
@@ -104,8 +104,8 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
final TypeComparator<IT2> solutionSetComparator;
// grab a handle to the hash table from the iteration broker
- if (taskContext instanceof AbstractIterativePactTask) {
- AbstractIterativePactTask<?, ?> iterativeTaskContext = (AbstractIterativePactTask<?, ?>) taskContext;
+ if (taskContext instanceof AbstractIterativeTask) {
+ AbstractIterativeTask<?, ?> iterativeTaskContext = (AbstractIterativeTask<?, ?>) taskContext;
String identifier = iterativeTaskContext.brokerKey();
Object table = SolutionSetBroker.instance().get(identifier);
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
index 7e6376f..60bc798 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
@@ -40,19 +40,19 @@ import org.slf4j.LoggerFactory;
* @param <OT> The mapper's output data type.
*/
@SuppressWarnings("deprecation")
-public class CollectorMapDriver<IT, OT> implements PactDriver<GenericCollectorMap<IT, OT>, OT> {
+public class CollectorMapDriver<IT, OT> implements Driver<GenericCollectorMap<IT, OT>, OT> {
private static final Logger LOG = LoggerFactory.getLogger(CollectorMapDriver.class);
- private PactTaskContext<GenericCollectorMap<IT, OT>, OT> taskContext;
+ private TaskContext<GenericCollectorMap<IT, OT>, OT> taskContext;
private volatile boolean running;
private boolean objectReuseEnabled = false;
@Override
- public void setup(PactTaskContext<GenericCollectorMap<IT, OT>, OT> context) {
+ public void setup(TaskContext<GenericCollectorMap<IT, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
index 011c980..c9d84b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
@@ -41,12 +41,12 @@ import org.apache.flink.util.MutableObjectIterator;
*
* @see org.apache.flink.api.common.functions.CrossFunction
*/
-public class CrossDriver<T1, T2, OT> implements PactDriver<CrossFunction<T1, T2, OT>, OT> {
+public class CrossDriver<T1, T2, OT> implements Driver<CrossFunction<T1, T2, OT>, OT> {
private static final Logger LOG = LoggerFactory.getLogger(CrossDriver.class);
- private PactTaskContext<CrossFunction<T1, T2, OT>, OT> taskContext;
+ private TaskContext<CrossFunction<T1, T2, OT>, OT> taskContext;
private MemoryManager memManager;
@@ -70,7 +70,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<CrossFunction<T1, T2,
@Override
- public void setup(PactTaskContext<CrossFunction<T1, T2, OT>, OT> context) {
+ public void setup(TaskContext<CrossFunction<T1, T2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 1002bae..d20bb89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -84,9 +84,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
@Override
public void registerInputOutput() {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Start registering input and output"));
- }
+
+ LOG.debug(getLogString("Start registering input and output"));
// initialize OutputFormat
initOutputFormat();
@@ -99,24 +98,18 @@ public class DataSinkTask<IT> extends AbstractInvokable {
(e.getMessage() == null ? "." : ": " + e.getMessage()), e);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Finished registering input and output"));
- }
+ LOG.debug(getLogString("Finished registering input and output"));
}
@Override
public void invoke() throws Exception
{
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Starting data sink operator"));
- }
+ LOG.debug(getLogString("Starting data sink operator"));
if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){
((RichOutputFormat) this.format).setRuntimeContext(createRuntimeContext());
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Rich Sink detected. Initializing runtime context."));
- }
+ LOG.debug(getLogString("Rich Sink detected. Initializing runtime context."));
}
ExecutionConfig executionConfig;
@@ -131,9 +124,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
LOG.warn("The execution config returned by the configuration was null");
executionConfig = new ExecutionConfig();
}
- } catch (IOException e) {
- throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: " + e);
- } catch (ClassNotFoundException e) {
+ } catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: " + e);
}
boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
@@ -190,9 +181,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
return;
}
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Starting to produce output"));
- }
+ LOG.debug(getLogString("Starting to produce output"));
// open
format.open(this.getEnvironment().getIndexInSubtaskGroup(), this.getEnvironment().getNumberOfSubtasks());
@@ -268,18 +257,14 @@ public class DataSinkTask<IT> extends AbstractInvokable {
}
}
- RegularPactTask.clearReaders(new MutableReader[]{inputReader});
+ BatchTask.clearReaders(new MutableReader[]{inputReader});
}
if (!this.taskCanceled) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Finished data sink operator"));
- }
+ LOG.debug(getLogString("Finished data sink operator"));
}
else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Data sink operator cancelled"));
- }
+ LOG.debug(getLogString("Data sink operator cancelled"));
}
}
@@ -304,9 +289,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Cancelling data sink operator"));
- }
+ LOG.debug(getLogString("Cancelling data sink operator"));
}
/**
@@ -402,7 +385,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
* @return The string ready for logging.
*/
private String getLogString(String message) {
- return RegularPactTask.constructLogString(message, this.getEnvironment().getTaskName(), this);
+ return BatchTask.constructLogString(message, this.getEnvironment().getTaskName(), this);
}
public DistributedRuntimeUDFContext createRuntimeContext() {
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 5806e77..801e1a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -80,9 +80,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
public void registerInputOutput() {
initInputFormat();
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Start registering input and output"));
- }
+ LOG.debug(getLogString("Start registering input and output"));
try {
initOutputs(getUserCodeClassLoader());
@@ -91,24 +89,18 @@ public class DataSourceTask<OT> extends AbstractInvokable {
ex.getMessage(), ex);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Finished registering input and output"));
- }
+ LOG.debug(getLogString("Finished registering input and output"));
}
@Override
public void invoke() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Starting data source operator"));
- }
+ LOG.debug(getLogString("Starting data source operator"));
if(RichInputFormat.class.isAssignableFrom(this.format.getClass())){
((RichInputFormat) this.format).setRuntimeContext(createRuntimeContext());
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
- }
+ LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
}
ExecutionConfig executionConfig;
@@ -123,23 +115,19 @@ public class DataSourceTask<OT> extends AbstractInvokable {
LOG.warn("ExecutionConfig from job configuration is null. Creating empty config");
executionConfig = new ExecutionConfig();
}
- } catch (IOException e) {
- throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: ", e);
- } catch (ClassNotFoundException e) {
+ } catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: ", e);
}
boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
- if (LOG.isDebugEnabled()) {
- LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
- }
+ LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
try {
// start all chained tasks
- RegularPactTask.openChainedTasks(this.chainedTasks, this);
+ BatchTask.openChainedTasks(this.chainedTasks, this);
// get input splits to read
final Iterator<InputSplit> splitIterator = getInputSplits();
@@ -150,18 +138,14 @@ public class DataSourceTask<OT> extends AbstractInvokable {
// get start and end
final InputSplit split = splitIterator.next();
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Opening input split " + split.toString()));
- }
+ LOG.debug(getLogString("Opening input split " + split.toString()));
final InputFormat<OT, InputSplit> format = this.format;
// open input format
format.open(split);
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Starting to read input from split " + split.toString()));
- }
+ LOG.debug(getLogString("Starting to read input from split " + split.toString()));
try {
final Collector<OT> output = this.output;
@@ -201,7 +185,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
this.output.close();
// close all chained tasks letting them report failure
- RegularPactTask.closeChainedTasks(this.chainedTasks, this);
+ BatchTask.closeChainedTasks(this.chainedTasks, this);
}
catch (Exception ex) {
@@ -210,7 +194,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
this.format.close();
} catch (Throwable ignored) {}
- RegularPactTask.cancelChainedTasks(this.chainedTasks);
+ BatchTask.cancelChainedTasks(this.chainedTasks);
ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
@@ -220,30 +204,24 @@ public class DataSourceTask<OT> extends AbstractInvokable {
}
else if (!this.taskCanceled) {
// drop exception, if the task was canceled
- RegularPactTask.logAndThrowException(ex, this);
+ BatchTask.logAndThrowException(ex, this);
}
} finally {
- RegularPactTask.clearWriters(eventualOutputs);
+ BatchTask.clearWriters(eventualOutputs);
}
if (!this.taskCanceled) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Finished data source operator"));
- }
+ LOG.debug(getLogString("Finished data source operator"));
}
else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Data source operator cancelled"));
- }
+ LOG.debug(getLogString("Data source operator cancelled"));
}
}
@Override
public void cancel() throws Exception {
this.taskCanceled = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug(getLogString("Cancelling data source operator"));
- }
+ LOG.debug(getLogString("Cancelling data source operator"));
}
/**
@@ -303,7 +281,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
final AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
final AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
- this.output = RegularPactTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs,
+ this.output = BatchTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs,
getExecutionConfig(), reporter, getEnvironment().getAccumulatorRegistry().getUserMap());
}
@@ -331,7 +309,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
* @return The string ready for logging.
*/
private String getLogString(String message, String taskName) {
- return RegularPactTask.constructLogString(message, taskName, this);
+ return BatchTask.constructLogString(message, taskName, this);
}
private Iterator<InputSplit> getInputSplits() {
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java
new file mode 100644
index 0000000..8e8795e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/Driver.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * The interface to be implemented by all drivers that run alone (or as the primary driver) in a task.
+ * A driver implements the actual code to perform a batch operation, like <i>map()</i>,
+ * <i>reduce()</i>, <i>join()</i>, or <i>coGroup()</i>.
+ *
+ * @see TaskContext
+ *
+ * @param <S> The type of stub driven by this driver.
+ * @param <OT> The data type of the records produced by this driver.
+ */
+public interface Driver<S extends Function, OT> {
+
+ void setup(TaskContext<S, OT> context);
+
+ /**
+ * Gets the number of inputs that the task has.
+ *
+ * @return The number of inputs.
+ */
+ int getNumberOfInputs();
+
+ /**
+ * Gets the number of comparators required for this driver.
+ *
+ * @return The number of comparators required for this driver.
+ */
+ int getNumberOfDriverComparators();
+
+ /**
+ * Gets the class of the stub type that is run by this task. For example, a <tt>MapTask</tt> should return
+ * <code>MapFunction.class</code>.
+ *
+ * @return The class of the stub type run by the task.
+ */
+ Class<S> getStubType();
+
+ /**
+ * This method is called before the user code is opened. An exception thrown by this method
+ * signals failure of the task.
+ *
+ * @throws Exception Exceptions may be forwarded and signal task failure.
+ */
+ void prepare() throws Exception;
+
+ /**
+ * The main operation method of the task. It should call the user code with the data subsets until
+ * the input is depleted.
+ *
+ * @throws Exception Any exception thrown by this method signals task failure. Because exceptions in the user
+ * code typically signal situations where this instance in unable to proceed, exceptions
+ * from the user code should be forwarded.
+ */
+ void run() throws Exception;
+
+ /**
+ * This method is invoked in any case (clean termination and exception) at the end of the tasks operation.
+ *
+ * @throws Exception Exceptions may be forwarded.
+ */
+ void cleanup() throws Exception;
+
+ /**
+ * This method is invoked when the driver must aborted in mid processing. It is invoked asynchronously by a different thread.
+ *
+ * @throws Exception Exceptions may be forwarded.
+ */
+ void cancel() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index 0aceae2..bc7bee5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -115,7 +115,7 @@ public enum DriverStrategy {
// --------------------------------------------------------------------------------------------
- private final Class<? extends PactDriver<?, ?>> driverClass;
+ private final Class<? extends Driver<?, ?>> driverClass;
private final Class<? extends ChainedDriver<?, ?>> pushChainDriver;
@@ -129,11 +129,11 @@ public enum DriverStrategy {
@SuppressWarnings("unchecked")
private DriverStrategy(
- @SuppressWarnings("rawtypes") Class<? extends PactDriver> driverClass,
+ @SuppressWarnings("rawtypes") Class<? extends Driver> driverClass,
@SuppressWarnings("rawtypes") Class<? extends ChainedDriver> pushChainDriverClass,
DamBehavior dam, int numComparator)
{
- this.driverClass = (Class<? extends PactDriver<?, ?>>) driverClass;
+ this.driverClass = (Class<? extends Driver<?, ?>>) driverClass;
this.pushChainDriver = (Class<? extends ChainedDriver<?, ?>>) pushChainDriverClass;
this.numInputs = 1;
this.dam1 = dam;
@@ -143,11 +143,11 @@ public enum DriverStrategy {
@SuppressWarnings("unchecked")
private DriverStrategy(
- @SuppressWarnings("rawtypes") Class<? extends PactDriver> driverClass,
+ @SuppressWarnings("rawtypes") Class<? extends Driver> driverClass,
@SuppressWarnings("rawtypes") Class<? extends ChainedDriver> pushChainDriverClass,
DamBehavior firstDam, DamBehavior secondDam, int numComparator)
{
- this.driverClass = (Class<? extends PactDriver<?, ?>>) driverClass;
+ this.driverClass = (Class<? extends Driver<?, ?>>) driverClass;
this.pushChainDriver = (Class<? extends ChainedDriver<?, ?>>) pushChainDriverClass;
this.numInputs = 2;
this.dam1 = firstDam;
@@ -157,7 +157,7 @@ public enum DriverStrategy {
// --------------------------------------------------------------------------------------------
- public Class<? extends PactDriver<?, ?>> getDriverClass() {
+ public Class<? extends Driver<?, ?>> getDriverClass() {
return this.driverClass;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
index fd245bd..c29923b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
@@ -39,18 +39,18 @@ import org.slf4j.LoggerFactory;
* @param <IT> The mapper's input data type.
* @param <OT> The mapper's output data type.
*/
-public class FlatMapDriver<IT, OT> implements PactDriver<FlatMapFunction<IT, OT>, OT> {
+public class FlatMapDriver<IT, OT> implements Driver<FlatMapFunction<IT, OT>, OT> {
private static final Logger LOG = LoggerFactory.getLogger(FlatMapDriver.class);
- private PactTaskContext<FlatMapFunction<IT, OT>, OT> taskContext;
+ private TaskContext<FlatMapFunction<IT, OT>, OT> taskContext;
private volatile boolean running;
private boolean objectReuseEnabled = false;
@Override
- public void setup(PactTaskContext<FlatMapFunction<IT, OT>, OT> context) {
+ public void setup(TaskContext<FlatMapFunction<IT, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}