You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hs...@apache.org on 2015/10/06 17:14:24 UTC

[4/5] flink git commit: [FLINK-2815] [REFACTOR] Remove Pact from class and file names since it is no longer valid reference

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
deleted file mode 100644
index 9cb045f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.iterative.task;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.operators.util.JoinHashMap;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.InputViewIterator;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
-import org.apache.flink.runtime.iterative.concurrent.Broker;
-import org.apache.flink.runtime.iterative.concurrent.IterationAggregatorBroker;
-import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
-import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
-import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepBarrier;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
-import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
-import org.apache.flink.runtime.iterative.event.TerminationEvent;
-import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
-import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
-import org.apache.flink.runtime.operators.RegularPactTask;
-import org.apache.flink.runtime.operators.hash.CompactingHashTable;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-/**
- * The head is responsible for coordinating an iteration and can run a
- * {@link org.apache.flink.runtime.operators.PactDriver} inside. It will read
- * the initial input and establish a {@link BlockingBackChannel} to the iteration's tail. After successfully processing
- * the input, it will send EndOfSuperstep events to its outputs. It must also be connected to a
- * synchronization task and after each superstep, it will wait
- * until it receives an {@link AllWorkersDoneEvent} from the sync, which signals that all other heads have also finished
- * their iteration. Starting with
- * the second iteration, the input for the head is the output of the tail, transmitted through the backchannel. Once the
- * iteration is done, the head
- * will send a {@link TerminationEvent} to all it's connected tasks, signaling them to shutdown.
- * <p>
- * Assumption on the ordering of the outputs: - The first n output gates write to channels that go to the tasks of the
- * step function. - The next m output gates to to the tasks that consume the final solution. - The last output gate
- * connects to the synchronization task.
- * 
- * @param <X>
- *        The type of the bulk partial solution / solution set and the final output.
- * @param <Y>
- *        The type of the feed-back data set (bulk partial solution / workset). For bulk iterations, {@code Y} is the
- *        same as {@code X}
- */
-public class IterationHeadPactTask<X, Y, S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
-
-	private static final Logger log = LoggerFactory.getLogger(IterationHeadPactTask.class);
-
-	private Collector<X> finalOutputCollector;
-
-	private TypeSerializerFactory<Y> feedbackTypeSerializer;
-
-	private TypeSerializerFactory<X> solutionTypeSerializer;
-
-	private ResultPartitionWriter toSync;
-
-	private int feedbackDataInput; // workset or bulk partial solution
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	protected int getNumTaskInputs() {
-		// this task has an additional input in the workset case for the initial solution set
-		boolean isWorkset = config.getIsWorksetIteration();
-		return driver.getNumberOfInputs() + (isWorkset ? 1 : 0);
-	}
-
-	@Override
-	protected void initOutputs() throws Exception {
-		// initialize the regular outputs first (the ones into the step function).
-		super.initOutputs();
-
-		// at this time, the outputs to the step function are created
-		// add the outputs for the final solution
-		List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
-		final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
-		final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
-		AccumulatorRegistry.Reporter reporter = getEnvironment().getAccumulatorRegistry().getReadWriteReporter();
-		this.finalOutputCollector = RegularPactTask.getOutputCollector(this, finalOutConfig,
-			userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs(), reporter);
-
-		// sanity check the setup
-		final int writersIntoStepFunction = this.eventualOutputs.size();
-		final int writersIntoFinalResult = finalOutputWriters.size();
-		final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
-
-		if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
-			throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
-		}
-		// now, we can instantiate the sync gate
-		this.toSync = getEnvironment().getWriter(syncGateIndex);
-	}
-
-	/**
-	 * the iteration head prepares the backchannel: it allocates memory, instantiates a {@link BlockingBackChannel} and
-	 * hands it to the iteration tail via a {@link Broker} singleton
-	 **/
-	private BlockingBackChannel initBackChannel() throws Exception {
-
-		/* get the size of the memory available to the backchannel */
-		int backChannelMemoryPages = getMemoryManager().computeNumberOfPages(this.config.getRelativeBackChannelMemory());
-
-		/* allocate the memory available to the backchannel */
-		List<MemorySegment> segments = new ArrayList<MemorySegment>();
-		int segmentSize = getMemoryManager().getPageSize();
-		getMemoryManager().allocatePages(this, segments, backChannelMemoryPages);
-
-		/* instantiate the backchannel */
-		BlockingBackChannel backChannel = new BlockingBackChannel(new SerializedUpdateBuffer(segments, segmentSize,
-			getIOManager()));
-
-		/* hand the backchannel over to the iteration tail */
-		Broker<BlockingBackChannel> broker = BlockingBackChannelBroker.instance();
-		broker.handIn(brokerKey(), backChannel);
-
-		return backChannel;
-	}
-	
-	private <BT> CompactingHashTable<BT> initCompactingHashTable() throws Exception {
-		// get some memory
-		double hashjoinMemorySize = config.getRelativeSolutionSetMemory();
-		final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
-
-		TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer(userCodeClassLoader);
-		TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator(userCodeClassLoader);
-	
-		TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
-		TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
-
-		CompactingHashTable<BT> hashTable = null;
-		List<MemorySegment> memSegments = null;
-		boolean success = false;
-		try {
-			int numPages = getMemoryManager().computeNumberOfPages(hashjoinMemorySize);
-			memSegments = getMemoryManager().allocatePages(getOwningNepheleTask(), numPages);
-			hashTable = new CompactingHashTable<BT>(solutionTypeSerializer, solutionTypeComparator, memSegments);
-			success = true;
-			return hashTable;
-		} finally {
-			if (!success) {
-				if (hashTable != null) {
-					try {
-						hashTable.close();
-					} catch (Throwable t) {
-						log.error("Error closing the solution set hash table after unsuccessful creation.", t);
-					}
-				}
-				if (memSegments != null) {
-					try {
-						getMemoryManager().release(memSegments);
-					} catch (Throwable t) {
-						log.error("Error freeing memory after error during solution set hash table creation.", t);
-					}
-				}
-			}
-		}
-	}
-	
-	private <BT> JoinHashMap<BT> initJoinHashMap() {
-		TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer
-				(getUserCodeClassLoader());
-		TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator
-				(getUserCodeClassLoader());
-	
-		TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
-		TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
-		
-		return new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator);
-	}
-	
-	private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
-		solutionSet.open();
-		solutionSet.buildTableWithUniqueKey(solutionSetInput);
-	}
-	
-	private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
-		TypeSerializer<X> serializer = solutionTypeSerializer.getSerializer();
-		
-		X next;
-		while ((next = solutionSetInput.next(serializer.createInstance())) != null) {
-			solutionSet.insertOrReplace(next);
-		}
-	}
-
-	private SuperstepBarrier initSuperstepBarrier() {
-		SuperstepBarrier barrier = new SuperstepBarrier(getUserCodeClassLoader());
-		this.toSync.subscribeToEvent(barrier, AllWorkersDoneEvent.class);
-		this.toSync.subscribeToEvent(barrier, TerminationEvent.class);
-		return barrier;
-	}
-
-	@Override
-	public void run() throws Exception {
-		final String brokerKey = brokerKey();
-		final int workerIndex = getEnvironment().getIndexInSubtaskGroup();
-		
-		final boolean objectSolutionSet = config.isSolutionSetUnmanaged();
-
-		CompactingHashTable<X> solutionSet = null; // if workset iteration
-		JoinHashMap<X> solutionSetObjectMap = null; // if workset iteration with unmanaged solution set
-		
-		boolean waitForSolutionSetUpdate = config.getWaitForSolutionSetUpdate();
-		boolean isWorksetIteration = config.getIsWorksetIteration();
-
-		try {
-			/* used for receiving the current iteration result from iteration tail */
-			SuperstepKickoffLatch nextStepKickoff = new SuperstepKickoffLatch();
-			SuperstepKickoffLatchBroker.instance().handIn(brokerKey, nextStepKickoff);
-			
-			BlockingBackChannel backChannel = initBackChannel();
-			SuperstepBarrier barrier = initSuperstepBarrier();
-			SolutionSetUpdateBarrier solutionSetUpdateBarrier = null;
-
-			feedbackDataInput = config.getIterationHeadPartialSolutionOrWorksetInputIndex();
-			feedbackTypeSerializer = this.getInputSerializer(feedbackDataInput);
-			excludeFromReset(feedbackDataInput);
-
-			int initialSolutionSetInput;
-			if (isWorksetIteration) {
-				initialSolutionSetInput = config.getIterationHeadSolutionSetInputIndex();
-				solutionTypeSerializer = config.getSolutionSetSerializer(getUserCodeClassLoader());
-
-				// setup the index for the solution set
-				@SuppressWarnings("unchecked")
-				MutableObjectIterator<X> solutionSetInput = (MutableObjectIterator<X>) createInputIterator(inputReaders[initialSolutionSetInput], solutionTypeSerializer);
-				
-				// read the initial solution set
-				if (objectSolutionSet) {
-					solutionSetObjectMap = initJoinHashMap();
-					readInitialSolutionSet(solutionSetObjectMap, solutionSetInput);
-					SolutionSetBroker.instance().handIn(brokerKey, solutionSetObjectMap);
-				} else {
-					solutionSet = initCompactingHashTable();
-					readInitialSolutionSet(solutionSet, solutionSetInput);
-					SolutionSetBroker.instance().handIn(brokerKey, solutionSet);
-				}
-
-				if (waitForSolutionSetUpdate) {
-					solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
-					SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier);
-				}
-			}
-			else {
-				// bulk iteration case
-				@SuppressWarnings("unchecked")
-				TypeSerializerFactory<X> solSer = (TypeSerializerFactory<X>) feedbackTypeSerializer;
-				solutionTypeSerializer = solSer;
-				
-				// = termination Criterion tail
-				if (waitForSolutionSetUpdate) {
-					solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
-					SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier);
-				}
-			}
-
-			// instantiate all aggregators and register them at the iteration global registry
-			RuntimeAggregatorRegistry aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators
-					(getUserCodeClassLoader()));
-			IterationAggregatorBroker.instance().handIn(brokerKey, aggregatorRegistry);
-
-			DataInputView superstepResult = null;
-
-			while (this.running && !terminationRequested()) {
-
-				if (log.isInfoEnabled()) {
-					log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
-				}
-
-				barrier.setup();
-
-				if (waitForSolutionSetUpdate) {
-					solutionSetUpdateBarrier.setup();
-				}
-
-				if (!inFirstIteration()) {
-					feedBackSuperstepResult(superstepResult);
-				}
-
-				super.run();
-
-				// signal to connected tasks that we are done with the superstep
-				sendEndOfSuperstepToAllIterationOutputs();
-
-				if (waitForSolutionSetUpdate) {
-					solutionSetUpdateBarrier.waitForSolutionSetUpdate();
-				}
-
-				// blocking call to wait for the result
-				superstepResult = backChannel.getReadEndAfterSuperstepEnded();
-				if (log.isInfoEnabled()) {
-					log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
-				}
-
-				sendEventToSync(new WorkerDoneEvent(workerIndex, aggregatorRegistry.getAllAggregators()));
-
-				if (log.isInfoEnabled()) {
-					log.info(formatLogString("waiting for other workers in iteration [" + currentIteration() + "]"));
-				}
-
-				barrier.waitForOtherWorkers();
-
-				if (barrier.terminationSignaled()) {
-					if (log.isInfoEnabled()) {
-						log.info(formatLogString("head received termination request in iteration ["
-							+ currentIteration()
-							+ "]"));
-					}
-					requestTermination();
-					nextStepKickoff.signalTermination();
-				} else {
-					incrementIterationCounter();
-
-					String[] globalAggregateNames = barrier.getAggregatorNames();
-					Value[] globalAggregates = barrier.getAggregates();
-					aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
-					
-					nextStepKickoff.triggerNextSuperstep();
-				}
-			}
-
-			if (log.isInfoEnabled()) {
-				log.info(formatLogString("streaming out final result after [" + currentIteration() + "] iterations"));
-			}
-
-			if (isWorksetIteration) {
-				if (objectSolutionSet) {
-					streamSolutionSetToFinalOutput(solutionSetObjectMap);
-				} else {
-					streamSolutionSetToFinalOutput(solutionSet);
-				}
-			} else {
-				streamOutFinalOutputBulk(new InputViewIterator<X>(superstepResult, this.solutionTypeSerializer.getSerializer()));
-			}
-
-			this.finalOutputCollector.close();
-
-		} finally {
-			// make sure we unregister everything from the broker:
-			// - backchannel
-			// - aggregator registry
-			// - solution set index
-			IterationAggregatorBroker.instance().remove(brokerKey);
-			BlockingBackChannelBroker.instance().remove(brokerKey);
-			SuperstepKickoffLatchBroker.instance().remove(brokerKey);
-			SolutionSetBroker.instance().remove(brokerKey);
-			SolutionSetUpdateBarrierBroker.instance().remove(brokerKey);
-
-			if (solutionSet != null) {
-				solutionSet.close();
-			}
-		}
-	}
-
-	private void streamOutFinalOutputBulk(MutableObjectIterator<X> results) throws IOException {
-		final Collector<X> out = this.finalOutputCollector;
-		X record = this.solutionTypeSerializer.getSerializer().createInstance();
-
-		while ((record = results.next(record)) != null) {
-			out.collect(record);
-		}
-	}
-	
-	private void streamSolutionSetToFinalOutput(CompactingHashTable<X> hashTable) throws IOException {
-		final MutableObjectIterator<X> results = hashTable.getEntryIterator();
-		final Collector<X> output = this.finalOutputCollector;
-		X record = solutionTypeSerializer.getSerializer().createInstance();
-
-		while ((record = results.next(record)) != null) {
-			output.collect(record);
-		}
-	}
-	
-	@SuppressWarnings("unchecked")
-	private void streamSolutionSetToFinalOutput(JoinHashMap<X> soluionSet) throws IOException {
-		final Collector<X> output = this.finalOutputCollector;
-		for (Object e : soluionSet.values()) {
-			output.collect((X) e);
-		}
-	}
-
-	private void feedBackSuperstepResult(DataInputView superstepResult) {
-		this.inputs[this.feedbackDataInput] =
-			new InputViewIterator<Y>(superstepResult, this.feedbackTypeSerializer.getSerializer());
-	}
-
-	private void sendEndOfSuperstepToAllIterationOutputs() throws IOException, InterruptedException {
-		if (log.isDebugEnabled()) {
-			log.debug(formatLogString("Sending end-of-superstep to all iteration outputs."));
-		}
-
-		for (RecordWriter<?> eventualOutput : this.eventualOutputs) {
-			eventualOutput.sendEndOfSuperstep();
-		}
-	}
-
-	private void sendEventToSync(WorkerDoneEvent event) throws IOException, InterruptedException {
-		if (log.isInfoEnabled()) {
-			log.info(formatLogString("sending " + WorkerDoneEvent.class.getSimpleName() + " to sync"));
-		}
-		this.toSync.writeEventToAllChannels(event);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
new file mode 100644
index 0000000..c6268f4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.task;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.operators.Driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.operators.util.JoinHashMap;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
+import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
+import org.apache.flink.runtime.iterative.concurrent.Broker;
+import org.apache.flink.runtime.iterative.concurrent.IterationAggregatorBroker;
+import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
+import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
+import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepBarrier;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
+import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
+import org.apache.flink.runtime.iterative.event.TerminationEvent;
+import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
+import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.hash.CompactingHashTable;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+/**
+ * The head is responsible for coordinating an iteration and can run a
+ * {@link Driver} inside. It will read
+ * the initial input and establish a {@link BlockingBackChannel} to the iteration's tail. After successfully processing
+ * the input, it will send EndOfSuperstep events to its outputs. It must also be connected to a
+ * synchronization task and after each superstep, it will wait
+ * until it receives an {@link AllWorkersDoneEvent} from the sync, which signals that all other heads have also finished
+ * their iteration. Starting with
+ * the second iteration, the input for the head is the output of the tail, transmitted through the backchannel. Once the
+ * iteration is done, the head
+ * will send a {@link TerminationEvent} to all it's connected tasks, signaling them to shutdown.
+ * <p>
+ * Assumption on the ordering of the outputs: - The first n output gates write to channels that go to the tasks of the
+ * step function. - The next m output gates to to the tasks that consume the final solution. - The last output gate
+ * connects to the synchronization task.
+ * 
+ * @param <X>
+ *        The type of the bulk partial solution / solution set and the final output.
+ * @param <Y>
+ *        The type of the feed-back data set (bulk partial solution / workset). For bulk iterations, {@code Y} is the
+ *        same as {@code X}
+ */
+public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIterativeTask<S, OT> {
+
+	private static final Logger log = LoggerFactory.getLogger(IterationHeadTask.class);
+
+	private Collector<X> finalOutputCollector;
+
+	private TypeSerializerFactory<Y> feedbackTypeSerializer;
+
+	private TypeSerializerFactory<X> solutionTypeSerializer;
+
+	private ResultPartitionWriter toSync;
+
+	private int feedbackDataInput; // workset or bulk partial solution
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	protected int getNumTaskInputs() {
+		// this task has an additional input in the workset case for the initial solution set
+		boolean isWorkset = config.getIsWorksetIteration();
+		return driver.getNumberOfInputs() + (isWorkset ? 1 : 0);
+	}
+
+	@Override
+	protected void initOutputs() throws Exception {
+		// initialize the regular outputs first (the ones into the step function).
+		super.initOutputs();
+
+		// at this time, the outputs to the step function are created
+		// add the outputs for the final solution
+		List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
+		final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
+		final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
+		AccumulatorRegistry.Reporter reporter = getEnvironment().getAccumulatorRegistry().getReadWriteReporter();
+		this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
+				userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs(), reporter);
+
+		// sanity check the setup
+		final int writersIntoStepFunction = this.eventualOutputs.size();
+		final int writersIntoFinalResult = finalOutputWriters.size();
+		final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
+
+		if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
+			throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
+		}
+		// now, we can instantiate the sync gate
+		this.toSync = getEnvironment().getWriter(syncGateIndex);
+	}
+
+	/**
+	 * the iteration head prepares the backchannel: it allocates memory, instantiates a {@link BlockingBackChannel} and
+	 * hands it to the iteration tail via a {@link Broker} singleton
+	 **/
+	private BlockingBackChannel initBackChannel() throws Exception {
+
+		/* get the size of the memory available to the backchannel */
+		int backChannelMemoryPages = getMemoryManager().computeNumberOfPages(this.config.getRelativeBackChannelMemory());
+
+		/* allocate the memory available to the backchannel */
+		List<MemorySegment> segments = new ArrayList<MemorySegment>();
+		int segmentSize = getMemoryManager().getPageSize();
+		getMemoryManager().allocatePages(this, segments, backChannelMemoryPages);
+
+		/* instantiate the backchannel */
+		BlockingBackChannel backChannel = new BlockingBackChannel(new SerializedUpdateBuffer(segments, segmentSize,
+			getIOManager()));
+
+		/* hand the backchannel over to the iteration tail */
+		Broker<BlockingBackChannel> broker = BlockingBackChannelBroker.instance();
+		broker.handIn(brokerKey(), backChannel);
+
+		return backChannel;
+	}
+	
+	private <BT> CompactingHashTable<BT> initCompactingHashTable() throws Exception {
+		// get some memory
+		double hashjoinMemorySize = config.getRelativeSolutionSetMemory();
+		final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
+
+		TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer(userCodeClassLoader);
+		TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator(userCodeClassLoader);
+	
+		TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
+		TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
+
+		CompactingHashTable<BT> hashTable = null;
+		List<MemorySegment> memSegments = null;
+		boolean success = false;
+		try {
+			int numPages = getMemoryManager().computeNumberOfPages(hashjoinMemorySize);
+			memSegments = getMemoryManager().allocatePages(getOwningNepheleTask(), numPages);
+			hashTable = new CompactingHashTable<BT>(solutionTypeSerializer, solutionTypeComparator, memSegments);
+			success = true;
+			return hashTable;
+		} finally {
+			if (!success) {
+				if (hashTable != null) {
+					try {
+						hashTable.close();
+					} catch (Throwable t) {
+						log.error("Error closing the solution set hash table after unsuccessful creation.", t);
+					}
+				}
+				if (memSegments != null) {
+					try {
+						getMemoryManager().release(memSegments);
+					} catch (Throwable t) {
+						log.error("Error freeing memory after error during solution set hash table creation.", t);
+					}
+				}
+			}
+		}
+	}
+	
+	private <BT> JoinHashMap<BT> initJoinHashMap() {
+		TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer
+				(getUserCodeClassLoader());
+		TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator
+				(getUserCodeClassLoader());
+	
+		TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
+		TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
+		
+		return new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator);
+	}
+	
+	private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
+		solutionSet.open();
+		solutionSet.buildTableWithUniqueKey(solutionSetInput);
+	}
+	
+	private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
+		TypeSerializer<X> serializer = solutionTypeSerializer.getSerializer();
+		
+		X next;
+		while ((next = solutionSetInput.next(serializer.createInstance())) != null) {
+			solutionSet.insertOrReplace(next);
+		}
+	}
+
+	private SuperstepBarrier initSuperstepBarrier() {
+		SuperstepBarrier barrier = new SuperstepBarrier(getUserCodeClassLoader());
+		this.toSync.subscribeToEvent(barrier, AllWorkersDoneEvent.class);
+		this.toSync.subscribeToEvent(barrier, TerminationEvent.class);
+		return barrier;
+	}
+
+	@Override
+	public void run() throws Exception {
+		final String brokerKey = brokerKey();
+		final int workerIndex = getEnvironment().getIndexInSubtaskGroup();
+		
+		final boolean objectSolutionSet = config.isSolutionSetUnmanaged();
+
+		CompactingHashTable<X> solutionSet = null; // if workset iteration
+		JoinHashMap<X> solutionSetObjectMap = null; // if workset iteration with unmanaged solution set
+		
+		boolean waitForSolutionSetUpdate = config.getWaitForSolutionSetUpdate();
+		boolean isWorksetIteration = config.getIsWorksetIteration();
+
+		try {
+			/* used for receiving the current iteration result from iteration tail */
+			SuperstepKickoffLatch nextStepKickoff = new SuperstepKickoffLatch();
+			SuperstepKickoffLatchBroker.instance().handIn(brokerKey, nextStepKickoff);
+			
+			BlockingBackChannel backChannel = initBackChannel();
+			SuperstepBarrier barrier = initSuperstepBarrier();
+			SolutionSetUpdateBarrier solutionSetUpdateBarrier = null;
+
+			feedbackDataInput = config.getIterationHeadPartialSolutionOrWorksetInputIndex();
+			feedbackTypeSerializer = this.getInputSerializer(feedbackDataInput);
+			excludeFromReset(feedbackDataInput);
+
+			int initialSolutionSetInput;
+			if (isWorksetIteration) {
+				initialSolutionSetInput = config.getIterationHeadSolutionSetInputIndex();
+				solutionTypeSerializer = config.getSolutionSetSerializer(getUserCodeClassLoader());
+
+				// setup the index for the solution set
+				@SuppressWarnings("unchecked")
+				MutableObjectIterator<X> solutionSetInput = (MutableObjectIterator<X>) createInputIterator(inputReaders[initialSolutionSetInput], solutionTypeSerializer);
+				
+				// read the initial solution set
+				if (objectSolutionSet) {
+					solutionSetObjectMap = initJoinHashMap();
+					readInitialSolutionSet(solutionSetObjectMap, solutionSetInput);
+					SolutionSetBroker.instance().handIn(brokerKey, solutionSetObjectMap);
+				} else {
+					solutionSet = initCompactingHashTable();
+					readInitialSolutionSet(solutionSet, solutionSetInput);
+					SolutionSetBroker.instance().handIn(brokerKey, solutionSet);
+				}
+
+				if (waitForSolutionSetUpdate) {
+					solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
+					SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier);
+				}
+			}
+			else {
+				// bulk iteration case
+				@SuppressWarnings("unchecked")
+				TypeSerializerFactory<X> solSer = (TypeSerializerFactory<X>) feedbackTypeSerializer;
+				solutionTypeSerializer = solSer;
+				
+				// = termination Criterion tail
+				if (waitForSolutionSetUpdate) {
+					solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
+					SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier);
+				}
+			}
+
+			// instantiate all aggregators and register them at the iteration global registry
+			RuntimeAggregatorRegistry aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators
+					(getUserCodeClassLoader()));
+			IterationAggregatorBroker.instance().handIn(brokerKey, aggregatorRegistry);
+
+			DataInputView superstepResult = null;
+
+			while (this.running && !terminationRequested()) {
+
+				if (log.isInfoEnabled()) {
+					log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
+				}
+
+				barrier.setup();
+
+				if (waitForSolutionSetUpdate) {
+					solutionSetUpdateBarrier.setup();
+				}
+
+				if (!inFirstIteration()) {
+					feedBackSuperstepResult(superstepResult);
+				}
+
+				super.run();
+
+				// signal to connected tasks that we are done with the superstep
+				sendEndOfSuperstepToAllIterationOutputs();
+
+				if (waitForSolutionSetUpdate) {
+					solutionSetUpdateBarrier.waitForSolutionSetUpdate();
+				}
+
+				// blocking call to wait for the result
+				superstepResult = backChannel.getReadEndAfterSuperstepEnded();
+				if (log.isInfoEnabled()) {
+					log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
+				}
+
+				sendEventToSync(new WorkerDoneEvent(workerIndex, aggregatorRegistry.getAllAggregators()));
+
+				if (log.isInfoEnabled()) {
+					log.info(formatLogString("waiting for other workers in iteration [" + currentIteration() + "]"));
+				}
+
+				barrier.waitForOtherWorkers();
+
+				if (barrier.terminationSignaled()) {
+					if (log.isInfoEnabled()) {
+						log.info(formatLogString("head received termination request in iteration ["
+							+ currentIteration()
+							+ "]"));
+					}
+					requestTermination();
+					nextStepKickoff.signalTermination();
+				} else {
+					incrementIterationCounter();
+
+					String[] globalAggregateNames = barrier.getAggregatorNames();
+					Value[] globalAggregates = barrier.getAggregates();
+					aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
+					
+					nextStepKickoff.triggerNextSuperstep();
+				}
+			}
+
+			if (log.isInfoEnabled()) {
+				log.info(formatLogString("streaming out final result after [" + currentIteration() + "] iterations"));
+			}
+
+			if (isWorksetIteration) {
+				if (objectSolutionSet) {
+					streamSolutionSetToFinalOutput(solutionSetObjectMap);
+				} else {
+					streamSolutionSetToFinalOutput(solutionSet);
+				}
+			} else {
+				streamOutFinalOutputBulk(new InputViewIterator<X>(superstepResult, this.solutionTypeSerializer.getSerializer()));
+			}
+
+			this.finalOutputCollector.close();
+
+		} finally {
+			// make sure we unregister everything from the broker:
+			// - backchannel
+			// - aggregator registry
+			// - solution set index
+			IterationAggregatorBroker.instance().remove(brokerKey);
+			BlockingBackChannelBroker.instance().remove(brokerKey);
+			SuperstepKickoffLatchBroker.instance().remove(brokerKey);
+			SolutionSetBroker.instance().remove(brokerKey);
+			SolutionSetUpdateBarrierBroker.instance().remove(brokerKey);
+
+			if (solutionSet != null) {
+				solutionSet.close();
+			}
+		}
+	}
+
+	private void streamOutFinalOutputBulk(MutableObjectIterator<X> results) throws IOException {
+		final Collector<X> out = this.finalOutputCollector;
+		X record = this.solutionTypeSerializer.getSerializer().createInstance();
+
+		while ((record = results.next(record)) != null) {
+			out.collect(record);
+		}
+	}
+	
+	private void streamSolutionSetToFinalOutput(CompactingHashTable<X> hashTable) throws IOException {
+		final MutableObjectIterator<X> results = hashTable.getEntryIterator();
+		final Collector<X> output = this.finalOutputCollector;
+		X record = solutionTypeSerializer.getSerializer().createInstance();
+
+		while ((record = results.next(record)) != null) {
+			output.collect(record);
+		}
+	}
+	
+	@SuppressWarnings("unchecked")
+	private void streamSolutionSetToFinalOutput(JoinHashMap<X> soluionSet) throws IOException {
+		final Collector<X> output = this.finalOutputCollector;
+		for (Object e : soluionSet.values()) {
+			output.collect((X) e);
+		}
+	}
+
+	private void feedBackSuperstepResult(DataInputView superstepResult) {
+		this.inputs[this.feedbackDataInput] =
+			new InputViewIterator<Y>(superstepResult, this.feedbackTypeSerializer.getSerializer());
+	}
+
+	private void sendEndOfSuperstepToAllIterationOutputs() throws IOException, InterruptedException {
+		if (log.isDebugEnabled()) {
+			log.debug(formatLogString("Sending end-of-superstep to all iteration outputs."));
+		}
+
+		for (RecordWriter<?> eventualOutput : this.eventualOutputs) {
+			eventualOutput.sendEndOfSuperstep();
+		}
+	}
+
+	private void sendEventToSync(WorkerDoneEvent event) throws IOException, InterruptedException {
+		if (log.isInfoEnabled()) {
+			log.info(formatLogString("sending " + WorkerDoneEvent.class.getSimpleName() + " to sync"));
+		}
+		this.toSync.writeEventToAllChannels(event);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
deleted file mode 100644
index e7801e4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.iterative.task;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
-import org.apache.flink.runtime.iterative.event.TerminationEvent;
-import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * An intermediate iteration task, which runs a Driver}inside.
- * <p>
- * It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to it's connected tasks. Furthermore
- * intermediate tasks can also update the iteration state, either the workset or the solution set.
- * <p>
- * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadPactTask} via
- * a {@link BlockingBackChannel} for the workset -XOR- a eHashTable for the solution set. In this case
- * this task must be scheduled on the same instance as the head.
- */
-public class IterationIntermediatePactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
-
-	private static final Logger log = LoggerFactory.getLogger(IterationIntermediatePactTask.class);
-
-	private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
-
-	@Override
-	protected void initialize() throws Exception {
-		super.initialize();
-
-		// set the last output collector of this task to reflect the iteration intermediate state update
-		// a) workset update
-		// b) solution set update
-		// c) none
-
-		Collector<OT> delegate = getLastOutputCollector();
-		if (isWorksetUpdate) {
-			// sanity check: we should not have a solution set and workset update at the same time
-			// in an intermediate task
-			if (isSolutionSetUpdate) {
-				throw new IllegalStateException("Plan bug: Intermediate task performs workset and solutions set update.");
-			}
-			
-			Collector<OT> outputCollector = createWorksetUpdateOutputCollector(delegate);
-
-			// we need the WorksetUpdateOutputCollector separately to count the collected elements
-			if (isWorksetIteration) {
-				worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector;
-			}
-
-			setLastOutputCollector(outputCollector);
-		} else if (isSolutionSetUpdate) {
-			setLastOutputCollector(createSolutionSetUpdateOutputCollector(delegate));
-		}
-	}
-
-	@Override
-	public void run() throws Exception {
-		
-		SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
-
-		while (this.running && !terminationRequested()) {
-
-			if (log.isInfoEnabled()) {
-				log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
-			}
-
-			super.run();
-
-			// check if termination was requested
-			verifyEndOfSuperstepState();
-
-			if (isWorksetUpdate && isWorksetIteration) {
-				long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
-				worksetAggregator.aggregate(numCollected);
-			}
-			
-			if (log.isInfoEnabled()) {
-				log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
-			}
-			
-			// let the successors know that the end of this superstep data is reached
-			sendEndOfSuperstep();
-			
-			if (isWorksetUpdate) {
-				// notify iteration head if responsible for workset update
-				worksetBackChannel.notifyOfEndOfSuperstep();
-			}
-			
-			boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
-
-			if (terminated) {
-				requestTermination();
-			}
-			else {
-				incrementIterationCounter();
-			}
-		}
-	}
-
-	private void sendEndOfSuperstep() throws IOException, InterruptedException {
-		for (RecordWriter eventualOutput : this.eventualOutputs) {
-			eventualOutput.sendEndOfSuperstep();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
new file mode 100644
index 0000000..60f0dcf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.task;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
+import org.apache.flink.runtime.iterative.event.TerminationEvent;
+import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * An intermediate iteration task, which runs a Driver}inside.
+ * <p>
+ * It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to it's connected tasks. Furthermore
+ * intermediate tasks can also update the iteration state, either the workset or the solution set.
+ * <p>
+ * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
+ * a {@link BlockingBackChannel} for the workset -XOR- a eHashTable for the solution set. In this case
+ * this task must be scheduled on the same instance as the head.
+ */
+public class IterationIntermediateTask<S extends Function, OT> extends AbstractIterativeTask<S, OT> {
+
+	private static final Logger log = LoggerFactory.getLogger(IterationIntermediateTask.class);
+
+	private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
+
+	@Override
+	protected void initialize() throws Exception {
+		super.initialize();
+
+		// set the last output collector of this task to reflect the iteration intermediate state update
+		// a) workset update
+		// b) solution set update
+		// c) none
+
+		Collector<OT> delegate = getLastOutputCollector();
+		if (isWorksetUpdate) {
+			// sanity check: we should not have a solution set and workset update at the same time
+			// in an intermediate task
+			if (isSolutionSetUpdate) {
+				throw new IllegalStateException("Plan bug: Intermediate task performs workset and solutions set update.");
+			}
+			
+			Collector<OT> outputCollector = createWorksetUpdateOutputCollector(delegate);
+
+			// we need the WorksetUpdateOutputCollector separately to count the collected elements
+			if (isWorksetIteration) {
+				worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector;
+			}
+
+			setLastOutputCollector(outputCollector);
+		} else if (isSolutionSetUpdate) {
+			setLastOutputCollector(createSolutionSetUpdateOutputCollector(delegate));
+		}
+	}
+
+	@Override
+	public void run() throws Exception {
+		
+		SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
+
+		while (this.running && !terminationRequested()) {
+
+			if (log.isInfoEnabled()) {
+				log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
+			}
+
+			super.run();
+
+			// check if termination was requested
+			verifyEndOfSuperstepState();
+
+			if (isWorksetUpdate && isWorksetIteration) {
+				long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
+				worksetAggregator.aggregate(numCollected);
+			}
+			
+			if (log.isInfoEnabled()) {
+				log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
+			}
+			
+			// let the successors know that the end of this superstep data is reached
+			sendEndOfSuperstep();
+			
+			if (isWorksetUpdate) {
+				// notify iteration head if responsible for workset update
+				worksetBackChannel.notifyOfEndOfSuperstep();
+			}
+			
+			boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+
+			if (terminated) {
+				requestTermination();
+			}
+			else {
+				incrementIterationCounter();
+			}
+		}
+	}
+
+	private void sendEndOfSuperstep() throws IOException, InterruptedException {
+		for (RecordWriter eventualOutput : this.eventualOutputs) {
+			eventualOutput.sendEndOfSuperstep();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index fed0a17..a85e662 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.types.IntValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +37,6 @@ import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
 import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.RegularPactTask;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.types.Value;
 
@@ -204,7 +204,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 	}
 
 	private String formatLogString(String message) {
-		return RegularPactTask.constructLogString(message, getEnvironment().getTaskName(), this);
+		return BatchTask.constructLogString(message, getEnvironment().getTaskName(), this);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
deleted file mode 100644
index 159d3f2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.iterative.task;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
-import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
-import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
-import org.apache.flink.util.Collector;
-
-/**
- * An iteration tail, which runs a driver inside.
- * <p>
- * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadPactTask} via
- * a BackChannel for the workset -OR- a HashTable for the solution set. Therefore this
- * task must be scheduled on the same instance as the head. It's also possible for the tail to update *both* the workset
- * and the solution set.
- * <p>
- * If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
- */
-public class IterationTailPactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
-
-	private static final Logger log = LoggerFactory.getLogger(IterationTailPactTask.class);
-
-	private SolutionSetUpdateBarrier solutionSetUpdateBarrier;
-
-	private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
-	
-
-	@Override
-	protected void initialize() throws Exception {
-		super.initialize();
-
-		// sanity check: the tail has to update either the workset or the solution set
-		if (!isWorksetUpdate && !isSolutionSetUpdate) {
-			throw new RuntimeException("The iteration tail doesn't update workset or the solution set.");
-		}
-
-		// set the last output collector of this task to reflect the iteration tail state update:
-		// a) workset update,
-		// b) solution set update, or
-		// c) merged workset and solution set update
-
-		Collector<OT> outputCollector = null;
-		if (isWorksetUpdate) {
-			outputCollector = createWorksetUpdateOutputCollector();
-
-			// we need the WorksetUpdateOutputCollector separately to count the collected elements
-			if (isWorksetIteration) {
-				worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector;
-			}
-		}
-
-		if (isSolutionSetUpdate) {
-			if (isWorksetIteration) {
-				outputCollector = createSolutionSetUpdateOutputCollector(outputCollector);
-			}
-			// Bulk iteration with termination criterion
-			else {
-				outputCollector = new Collector<OT>() {
-					@Override
-					public void collect(OT record) {}
-					@Override
-					public void close() {}
-				};
-			}
-
-			if (!isWorksetUpdate) {
-				solutionSetUpdateBarrier = SolutionSetUpdateBarrierBroker.instance().get(brokerKey());
-			}
-		}
-
-		setLastOutputCollector(outputCollector);
-	}
-
-	@Override
-	public void run() throws Exception {
-		
-		SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
-		
-		while (this.running && !terminationRequested()) {
-
-			if (log.isInfoEnabled()) {
-				log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
-			}
-
-			super.run();
-
-			// check if termination was requested
-			verifyEndOfSuperstepState();
-
-			if (isWorksetUpdate && isWorksetIteration) {
-				// aggregate workset update element count
-				long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
-				worksetAggregator.aggregate(numCollected);
-
-			}
-
-			if (log.isInfoEnabled()) {
-				log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
-			}
-			
-			if (isWorksetUpdate) {
-				// notify iteration head if responsible for workset update
-				worksetBackChannel.notifyOfEndOfSuperstep();
-			} else if (isSolutionSetUpdate) {
-				// notify iteration head if responsible for solution set update
-				solutionSetUpdateBarrier.notifySolutionSetUpdate();
-			}
-
-			boolean terminate = nextSuperStepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
-			if (terminate) {
-				requestTermination();
-			}
-			else {
-				incrementIterationCounter();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
new file mode 100644
index 0000000..9e0b560
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.task;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
+import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
+import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
+import org.apache.flink.util.Collector;
+
+/**
+ * An iteration tail, which runs a driver inside.
+ * <p>
+ * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
+ * a BackChannel for the workset -OR- a HashTable for the solution set. Therefore this
+ * task must be scheduled on the same instance as the head. It's also possible for the tail to update *both* the workset
+ * and the solution set.
+ * <p>
+ * If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
+ */
+public class IterationTailTask<S extends Function, OT> extends AbstractIterativeTask<S, OT> {
+
+	private static final Logger log = LoggerFactory.getLogger(IterationTailTask.class);
+
+	private SolutionSetUpdateBarrier solutionSetUpdateBarrier;
+
+	private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
+	
+
+	@Override
+	protected void initialize() throws Exception {
+		super.initialize();
+
+		// sanity check: the tail has to update either the workset or the solution set
+		if (!isWorksetUpdate && !isSolutionSetUpdate) {
+			throw new RuntimeException("The iteration tail doesn't update workset or the solution set.");
+		}
+
+		// set the last output collector of this task to reflect the iteration tail state update:
+		// a) workset update,
+		// b) solution set update, or
+		// c) merged workset and solution set update
+
+		Collector<OT> outputCollector = null;
+		if (isWorksetUpdate) {
+			outputCollector = createWorksetUpdateOutputCollector();
+
+			// we need the WorksetUpdateOutputCollector separately to count the collected elements
+			if (isWorksetIteration) {
+				worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector;
+			}
+		}
+
+		if (isSolutionSetUpdate) {
+			if (isWorksetIteration) {
+				outputCollector = createSolutionSetUpdateOutputCollector(outputCollector);
+			}
+			// Bulk iteration with termination criterion
+			else {
+				outputCollector = new Collector<OT>() {
+					@Override
+					public void collect(OT record) {}
+					@Override
+					public void close() {}
+				};
+			}
+
+			if (!isWorksetUpdate) {
+				solutionSetUpdateBarrier = SolutionSetUpdateBarrierBroker.instance().get(brokerKey());
+			}
+		}
+
+		setLastOutputCollector(outputCollector);
+	}
+
+	@Override
+	public void run() throws Exception {
+		
+		SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
+		
+		while (this.running && !terminationRequested()) {
+
+			if (log.isInfoEnabled()) {
+				log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
+			}
+
+			super.run();
+
+			// check if termination was requested
+			verifyEndOfSuperstepState();
+
+			if (isWorksetUpdate && isWorksetIteration) {
+				// aggregate workset update element count
+				long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
+				worksetAggregator.aggregate(numCollected);
+
+			}
+
+			if (log.isInfoEnabled()) {
+				log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
+			}
+			
+			if (isWorksetUpdate) {
+				// notify iteration head if responsible for workset update
+				worksetBackChannel.notifyOfEndOfSuperstep();
+			} else if (isSolutionSetUpdate) {
+				// notify iteration head if responsible for solution set update
+				solutionSetUpdateBarrier.notifySolutionSetUpdate();
+			}
+
+			boolean terminate = nextSuperStepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+			if (terminate) {
+				requestTermination();
+			}
+			else {
+				incrementIterationCounter();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 0e0bd26..4923b3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobgraph.tasks;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,7 +29,7 @@ import org.slf4j.LoggerFactory;
 /**
  * This is the abstract base class for every task that can be executed by a TaskManager.
  * Concrete tasks like the stream vertices of the batch tasks
- * (see {@link org.apache.flink.runtime.operators.RegularPactTask}) inherit from this class.
+ * (see {@link BatchTask}) inherit from this class.
  *
  * The TaskManager invokes the methods {@link #registerInputOutput()} and {@link #invoke()} in
  * this order when executing a task. The first method is responsible for setting up input and

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
index 4096f0c..8f72754 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends JoinDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
+public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends JoinDriver<IT1, IT2, OT> implements ResettableDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
 
 	private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
index 38c74e0..8c964d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
@@ -38,11 +38,11 @@ import org.slf4j.LoggerFactory;
  *
  * @see FlatJoinFunction
  */
-public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
+public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT2, OT>, OT> {
 	
 	protected static final Logger LOG = LoggerFactory.getLogger(AbstractOuterJoinDriver.class);
 	
-	protected PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
+	protected TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
 	
 	protected volatile JoinTaskIterator<IT1, IT2, OT> outerJoinIterator; // the iterator that does the actual outer join
 	protected volatile boolean running;
@@ -50,7 +50,7 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements PactDrive
 	// ------------------------------------------------------------------------
 	
 	@Override
-	public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
+	public void setup(TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
 		this.taskContext = context;
 		this.running = true;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
index 7b279ee..0c8dc34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
@@ -37,18 +37,18 @@ import org.slf4j.LoggerFactory;
 *
 * @see GroupCombineFunction
 */
-public class AllGroupCombineDriver<IN, OUT> implements PactDriver<GroupCombineFunction<IN, OUT>, OUT> {
+public class AllGroupCombineDriver<IN, OUT> implements Driver<GroupCombineFunction<IN, OUT>, OUT> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(AllGroupCombineDriver.class);
 
-	private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
+	private TaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
 
 	private boolean objectReuseEnabled = false;
 
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> context) {
+	public void setup(TaskContext<GroupCombineFunction<IN, OUT>, OUT> context) {
 		this.taskContext = context;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
index a20fddf..255c57c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
@@ -45,11 +45,11 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @see org.apache.flink.api.common.functions.GroupReduceFunction
  */
-public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction<IT, OT>, OT> {
+public class AllGroupReduceDriver<IT, OT> implements Driver<GroupReduceFunction<IT, OT>, OT> {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(AllGroupReduceDriver.class);
 
-	private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
+	private TaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
 	
 	private MutableObjectIterator<IT> input;
 
@@ -62,7 +62,7 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void setup(PactTaskContext<GroupReduceFunction<IT, OT>, OT> context) {
+	public void setup(TaskContext<GroupReduceFunction<IT, OT>, OT> context) {
 		this.taskContext = context;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index 1f58c1b..f27ae34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -38,11 +38,11 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @see org.apache.flink.api.common.functions.ReduceFunction
  */
-public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
+public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(AllReduceDriver.class);
 
-	private PactTaskContext<ReduceFunction<T>, T> taskContext;
+	private TaskContext<ReduceFunction<T>, T> taskContext;
 	
 	private MutableObjectIterator<T> input;
 
@@ -55,7 +55,7 @@ public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
+	public void setup(TaskContext<ReduceFunction<T>, T> context) {
 		this.taskContext = context;
 		this.running = true;
 	}