You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hs...@apache.org on 2015/10/06 17:14:24 UTC
[4/5] flink git commit: [FLINK-2815] [REFACTOR] Remove Pact from
class and file names since it is no longer valid reference
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
deleted file mode 100644
index 9cb045f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.iterative.task;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.operators.util.JoinHashMap;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.InputViewIterator;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
-import org.apache.flink.runtime.iterative.concurrent.Broker;
-import org.apache.flink.runtime.iterative.concurrent.IterationAggregatorBroker;
-import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
-import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
-import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepBarrier;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
-import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
-import org.apache.flink.runtime.iterative.event.TerminationEvent;
-import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
-import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
-import org.apache.flink.runtime.operators.RegularPactTask;
-import org.apache.flink.runtime.operators.hash.CompactingHashTable;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-/**
- * The head is responsible for coordinating an iteration and can run a
- * {@link org.apache.flink.runtime.operators.PactDriver} inside. It will read
- * the initial input and establish a {@link BlockingBackChannel} to the iteration's tail. After successfully processing
- * the input, it will send EndOfSuperstep events to its outputs. It must also be connected to a
- * synchronization task and after each superstep, it will wait
- * until it receives an {@link AllWorkersDoneEvent} from the sync, which signals that all other heads have also finished
- * their iteration. Starting with
- * the second iteration, the input for the head is the output of the tail, transmitted through the backchannel. Once the
- * iteration is done, the head
- * will send a {@link TerminationEvent} to all it's connected tasks, signaling them to shutdown.
- * <p>
- * Assumption on the ordering of the outputs: - The first n output gates write to channels that go to the tasks of the
- * step function. - The next m output gates to to the tasks that consume the final solution. - The last output gate
- * connects to the synchronization task.
- *
- * @param <X>
- * The type of the bulk partial solution / solution set and the final output.
- * @param <Y>
- * The type of the feed-back data set (bulk partial solution / workset). For bulk iterations, {@code Y} is the
- * same as {@code X}
- */
-public class IterationHeadPactTask<X, Y, S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
-
- private static final Logger log = LoggerFactory.getLogger(IterationHeadPactTask.class);
-
- private Collector<X> finalOutputCollector;
-
- private TypeSerializerFactory<Y> feedbackTypeSerializer;
-
- private TypeSerializerFactory<X> solutionTypeSerializer;
-
- private ResultPartitionWriter toSync;
-
- private int feedbackDataInput; // workset or bulk partial solution
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- protected int getNumTaskInputs() {
- // this task has an additional input in the workset case for the initial solution set
- boolean isWorkset = config.getIsWorksetIteration();
- return driver.getNumberOfInputs() + (isWorkset ? 1 : 0);
- }
-
- @Override
- protected void initOutputs() throws Exception {
- // initialize the regular outputs first (the ones into the step function).
- super.initOutputs();
-
- // at this time, the outputs to the step function are created
- // add the outputs for the final solution
- List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
- final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
- final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
- AccumulatorRegistry.Reporter reporter = getEnvironment().getAccumulatorRegistry().getReadWriteReporter();
- this.finalOutputCollector = RegularPactTask.getOutputCollector(this, finalOutConfig,
- userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs(), reporter);
-
- // sanity check the setup
- final int writersIntoStepFunction = this.eventualOutputs.size();
- final int writersIntoFinalResult = finalOutputWriters.size();
- final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
-
- if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
- throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
- }
- // now, we can instantiate the sync gate
- this.toSync = getEnvironment().getWriter(syncGateIndex);
- }
-
- /**
- * the iteration head prepares the backchannel: it allocates memory, instantiates a {@link BlockingBackChannel} and
- * hands it to the iteration tail via a {@link Broker} singleton
- **/
- private BlockingBackChannel initBackChannel() throws Exception {
-
- /* get the size of the memory available to the backchannel */
- int backChannelMemoryPages = getMemoryManager().computeNumberOfPages(this.config.getRelativeBackChannelMemory());
-
- /* allocate the memory available to the backchannel */
- List<MemorySegment> segments = new ArrayList<MemorySegment>();
- int segmentSize = getMemoryManager().getPageSize();
- getMemoryManager().allocatePages(this, segments, backChannelMemoryPages);
-
- /* instantiate the backchannel */
- BlockingBackChannel backChannel = new BlockingBackChannel(new SerializedUpdateBuffer(segments, segmentSize,
- getIOManager()));
-
- /* hand the backchannel over to the iteration tail */
- Broker<BlockingBackChannel> broker = BlockingBackChannelBroker.instance();
- broker.handIn(brokerKey(), backChannel);
-
- return backChannel;
- }
-
- private <BT> CompactingHashTable<BT> initCompactingHashTable() throws Exception {
- // get some memory
- double hashjoinMemorySize = config.getRelativeSolutionSetMemory();
- final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
-
- TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer(userCodeClassLoader);
- TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator(userCodeClassLoader);
-
- TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
- TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
-
- CompactingHashTable<BT> hashTable = null;
- List<MemorySegment> memSegments = null;
- boolean success = false;
- try {
- int numPages = getMemoryManager().computeNumberOfPages(hashjoinMemorySize);
- memSegments = getMemoryManager().allocatePages(getOwningNepheleTask(), numPages);
- hashTable = new CompactingHashTable<BT>(solutionTypeSerializer, solutionTypeComparator, memSegments);
- success = true;
- return hashTable;
- } finally {
- if (!success) {
- if (hashTable != null) {
- try {
- hashTable.close();
- } catch (Throwable t) {
- log.error("Error closing the solution set hash table after unsuccessful creation.", t);
- }
- }
- if (memSegments != null) {
- try {
- getMemoryManager().release(memSegments);
- } catch (Throwable t) {
- log.error("Error freeing memory after error during solution set hash table creation.", t);
- }
- }
- }
- }
- }
-
- private <BT> JoinHashMap<BT> initJoinHashMap() {
- TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer
- (getUserCodeClassLoader());
- TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator
- (getUserCodeClassLoader());
-
- TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
- TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
-
- return new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator);
- }
-
- private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
- solutionSet.open();
- solutionSet.buildTableWithUniqueKey(solutionSetInput);
- }
-
- private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
- TypeSerializer<X> serializer = solutionTypeSerializer.getSerializer();
-
- X next;
- while ((next = solutionSetInput.next(serializer.createInstance())) != null) {
- solutionSet.insertOrReplace(next);
- }
- }
-
- private SuperstepBarrier initSuperstepBarrier() {
- SuperstepBarrier barrier = new SuperstepBarrier(getUserCodeClassLoader());
- this.toSync.subscribeToEvent(barrier, AllWorkersDoneEvent.class);
- this.toSync.subscribeToEvent(barrier, TerminationEvent.class);
- return barrier;
- }
-
- @Override
- public void run() throws Exception {
- final String brokerKey = brokerKey();
- final int workerIndex = getEnvironment().getIndexInSubtaskGroup();
-
- final boolean objectSolutionSet = config.isSolutionSetUnmanaged();
-
- CompactingHashTable<X> solutionSet = null; // if workset iteration
- JoinHashMap<X> solutionSetObjectMap = null; // if workset iteration with unmanaged solution set
-
- boolean waitForSolutionSetUpdate = config.getWaitForSolutionSetUpdate();
- boolean isWorksetIteration = config.getIsWorksetIteration();
-
- try {
- /* used for receiving the current iteration result from iteration tail */
- SuperstepKickoffLatch nextStepKickoff = new SuperstepKickoffLatch();
- SuperstepKickoffLatchBroker.instance().handIn(brokerKey, nextStepKickoff);
-
- BlockingBackChannel backChannel = initBackChannel();
- SuperstepBarrier barrier = initSuperstepBarrier();
- SolutionSetUpdateBarrier solutionSetUpdateBarrier = null;
-
- feedbackDataInput = config.getIterationHeadPartialSolutionOrWorksetInputIndex();
- feedbackTypeSerializer = this.getInputSerializer(feedbackDataInput);
- excludeFromReset(feedbackDataInput);
-
- int initialSolutionSetInput;
- if (isWorksetIteration) {
- initialSolutionSetInput = config.getIterationHeadSolutionSetInputIndex();
- solutionTypeSerializer = config.getSolutionSetSerializer(getUserCodeClassLoader());
-
- // setup the index for the solution set
- @SuppressWarnings("unchecked")
- MutableObjectIterator<X> solutionSetInput = (MutableObjectIterator<X>) createInputIterator(inputReaders[initialSolutionSetInput], solutionTypeSerializer);
-
- // read the initial solution set
- if (objectSolutionSet) {
- solutionSetObjectMap = initJoinHashMap();
- readInitialSolutionSet(solutionSetObjectMap, solutionSetInput);
- SolutionSetBroker.instance().handIn(brokerKey, solutionSetObjectMap);
- } else {
- solutionSet = initCompactingHashTable();
- readInitialSolutionSet(solutionSet, solutionSetInput);
- SolutionSetBroker.instance().handIn(brokerKey, solutionSet);
- }
-
- if (waitForSolutionSetUpdate) {
- solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
- SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier);
- }
- }
- else {
- // bulk iteration case
- @SuppressWarnings("unchecked")
- TypeSerializerFactory<X> solSer = (TypeSerializerFactory<X>) feedbackTypeSerializer;
- solutionTypeSerializer = solSer;
-
- // = termination Criterion tail
- if (waitForSolutionSetUpdate) {
- solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
- SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier);
- }
- }
-
- // instantiate all aggregators and register them at the iteration global registry
- RuntimeAggregatorRegistry aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators
- (getUserCodeClassLoader()));
- IterationAggregatorBroker.instance().handIn(brokerKey, aggregatorRegistry);
-
- DataInputView superstepResult = null;
-
- while (this.running && !terminationRequested()) {
-
- if (log.isInfoEnabled()) {
- log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
- }
-
- barrier.setup();
-
- if (waitForSolutionSetUpdate) {
- solutionSetUpdateBarrier.setup();
- }
-
- if (!inFirstIteration()) {
- feedBackSuperstepResult(superstepResult);
- }
-
- super.run();
-
- // signal to connected tasks that we are done with the superstep
- sendEndOfSuperstepToAllIterationOutputs();
-
- if (waitForSolutionSetUpdate) {
- solutionSetUpdateBarrier.waitForSolutionSetUpdate();
- }
-
- // blocking call to wait for the result
- superstepResult = backChannel.getReadEndAfterSuperstepEnded();
- if (log.isInfoEnabled()) {
- log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
- }
-
- sendEventToSync(new WorkerDoneEvent(workerIndex, aggregatorRegistry.getAllAggregators()));
-
- if (log.isInfoEnabled()) {
- log.info(formatLogString("waiting for other workers in iteration [" + currentIteration() + "]"));
- }
-
- barrier.waitForOtherWorkers();
-
- if (barrier.terminationSignaled()) {
- if (log.isInfoEnabled()) {
- log.info(formatLogString("head received termination request in iteration ["
- + currentIteration()
- + "]"));
- }
- requestTermination();
- nextStepKickoff.signalTermination();
- } else {
- incrementIterationCounter();
-
- String[] globalAggregateNames = barrier.getAggregatorNames();
- Value[] globalAggregates = barrier.getAggregates();
- aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
-
- nextStepKickoff.triggerNextSuperstep();
- }
- }
-
- if (log.isInfoEnabled()) {
- log.info(formatLogString("streaming out final result after [" + currentIteration() + "] iterations"));
- }
-
- if (isWorksetIteration) {
- if (objectSolutionSet) {
- streamSolutionSetToFinalOutput(solutionSetObjectMap);
- } else {
- streamSolutionSetToFinalOutput(solutionSet);
- }
- } else {
- streamOutFinalOutputBulk(new InputViewIterator<X>(superstepResult, this.solutionTypeSerializer.getSerializer()));
- }
-
- this.finalOutputCollector.close();
-
- } finally {
- // make sure we unregister everything from the broker:
- // - backchannel
- // - aggregator registry
- // - solution set index
- IterationAggregatorBroker.instance().remove(brokerKey);
- BlockingBackChannelBroker.instance().remove(brokerKey);
- SuperstepKickoffLatchBroker.instance().remove(brokerKey);
- SolutionSetBroker.instance().remove(brokerKey);
- SolutionSetUpdateBarrierBroker.instance().remove(brokerKey);
-
- if (solutionSet != null) {
- solutionSet.close();
- }
- }
- }
-
- private void streamOutFinalOutputBulk(MutableObjectIterator<X> results) throws IOException {
- final Collector<X> out = this.finalOutputCollector;
- X record = this.solutionTypeSerializer.getSerializer().createInstance();
-
- while ((record = results.next(record)) != null) {
- out.collect(record);
- }
- }
-
- private void streamSolutionSetToFinalOutput(CompactingHashTable<X> hashTable) throws IOException {
- final MutableObjectIterator<X> results = hashTable.getEntryIterator();
- final Collector<X> output = this.finalOutputCollector;
- X record = solutionTypeSerializer.getSerializer().createInstance();
-
- while ((record = results.next(record)) != null) {
- output.collect(record);
- }
- }
-
- @SuppressWarnings("unchecked")
- private void streamSolutionSetToFinalOutput(JoinHashMap<X> soluionSet) throws IOException {
- final Collector<X> output = this.finalOutputCollector;
- for (Object e : soluionSet.values()) {
- output.collect((X) e);
- }
- }
-
- private void feedBackSuperstepResult(DataInputView superstepResult) {
- this.inputs[this.feedbackDataInput] =
- new InputViewIterator<Y>(superstepResult, this.feedbackTypeSerializer.getSerializer());
- }
-
- private void sendEndOfSuperstepToAllIterationOutputs() throws IOException, InterruptedException {
- if (log.isDebugEnabled()) {
- log.debug(formatLogString("Sending end-of-superstep to all iteration outputs."));
- }
-
- for (RecordWriter<?> eventualOutput : this.eventualOutputs) {
- eventualOutput.sendEndOfSuperstep();
- }
- }
-
- private void sendEventToSync(WorkerDoneEvent event) throws IOException, InterruptedException {
- if (log.isInfoEnabled()) {
- log.info(formatLogString("sending " + WorkerDoneEvent.class.getSimpleName() + " to sync"));
- }
- this.toSync.writeEventToAllChannels(event);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
new file mode 100644
index 0000000..c6268f4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.task;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.operators.Driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.operators.util.JoinHashMap;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
+import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
+import org.apache.flink.runtime.iterative.concurrent.Broker;
+import org.apache.flink.runtime.iterative.concurrent.IterationAggregatorBroker;
+import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
+import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
+import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepBarrier;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
+import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
+import org.apache.flink.runtime.iterative.event.TerminationEvent;
+import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
+import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.hash.CompactingHashTable;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+/**
+ * The head is responsible for coordinating an iteration and can run a
+ * {@link Driver} inside. It will read
+ * the initial input and establish a {@link BlockingBackChannel} to the iteration's tail. After successfully processing
+ * the input, it will send EndOfSuperstep events to its outputs. It must also be connected to a
+ * synchronization task and after each superstep, it will wait
+ * until it receives an {@link AllWorkersDoneEvent} from the sync, which signals that all other heads have also finished
+ * their iteration. Starting with
+ * the second iteration, the input for the head is the output of the tail, transmitted through the backchannel. Once the
+ * iteration is done, the head
+ * will send a {@link TerminationEvent} to all it's connected tasks, signaling them to shutdown.
+ * <p>
+ * Assumption on the ordering of the outputs: - The first n output gates write to channels that go to the tasks of the
+ * step function. - The next m output gates to to the tasks that consume the final solution. - The last output gate
+ * connects to the synchronization task.
+ *
+ * @param <X>
+ * The type of the bulk partial solution / solution set and the final output.
+ * @param <Y>
+ * The type of the feed-back data set (bulk partial solution / workset). For bulk iterations, {@code Y} is the
+ * same as {@code X}
+ */
+public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIterativeTask<S, OT> {
+
+ private static final Logger log = LoggerFactory.getLogger(IterationHeadTask.class);
+
+ private Collector<X> finalOutputCollector;
+
+ private TypeSerializerFactory<Y> feedbackTypeSerializer;
+
+ private TypeSerializerFactory<X> solutionTypeSerializer;
+
+ private ResultPartitionWriter toSync;
+
+ private int feedbackDataInput; // workset or bulk partial solution
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ protected int getNumTaskInputs() {
+ // this task has an additional input in the workset case for the initial solution set
+ boolean isWorkset = config.getIsWorksetIteration();
+ return driver.getNumberOfInputs() + (isWorkset ? 1 : 0);
+ }
+
+ @Override
+ protected void initOutputs() throws Exception {
+ // initialize the regular outputs first (the ones into the step function).
+ super.initOutputs();
+
+ // at this time, the outputs to the step function are created
+ // add the outputs for the final solution
+ List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
+ final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
+ final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
+ AccumulatorRegistry.Reporter reporter = getEnvironment().getAccumulatorRegistry().getReadWriteReporter();
+ this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig,
+ userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs(), reporter);
+
+ // sanity check the setup
+ final int writersIntoStepFunction = this.eventualOutputs.size();
+ final int writersIntoFinalResult = finalOutputWriters.size();
+ final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
+
+ if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
+ throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
+ }
+ // now, we can instantiate the sync gate
+ this.toSync = getEnvironment().getWriter(syncGateIndex);
+ }
+
+ /**
+ * the iteration head prepares the backchannel: it allocates memory, instantiates a {@link BlockingBackChannel} and
+ * hands it to the iteration tail via a {@link Broker} singleton
+ **/
+ private BlockingBackChannel initBackChannel() throws Exception {
+
+ /* get the size of the memory available to the backchannel */
+ int backChannelMemoryPages = getMemoryManager().computeNumberOfPages(this.config.getRelativeBackChannelMemory());
+
+ /* allocate the memory available to the backchannel */
+ List<MemorySegment> segments = new ArrayList<MemorySegment>();
+ int segmentSize = getMemoryManager().getPageSize();
+ getMemoryManager().allocatePages(this, segments, backChannelMemoryPages);
+
+ /* instantiate the backchannel */
+ BlockingBackChannel backChannel = new BlockingBackChannel(new SerializedUpdateBuffer(segments, segmentSize,
+ getIOManager()));
+
+ /* hand the backchannel over to the iteration tail */
+ Broker<BlockingBackChannel> broker = BlockingBackChannelBroker.instance();
+ broker.handIn(brokerKey(), backChannel);
+
+ return backChannel;
+ }
+
+ private <BT> CompactingHashTable<BT> initCompactingHashTable() throws Exception {
+ // get some memory
+ double hashjoinMemorySize = config.getRelativeSolutionSetMemory();
+ final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
+
+ TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer(userCodeClassLoader);
+ TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator(userCodeClassLoader);
+
+ TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
+ TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
+
+ CompactingHashTable<BT> hashTable = null;
+ List<MemorySegment> memSegments = null;
+ boolean success = false;
+ try {
+ int numPages = getMemoryManager().computeNumberOfPages(hashjoinMemorySize);
+ memSegments = getMemoryManager().allocatePages(getOwningNepheleTask(), numPages);
+ hashTable = new CompactingHashTable<BT>(solutionTypeSerializer, solutionTypeComparator, memSegments);
+ success = true;
+ return hashTable;
+ } finally {
+ if (!success) {
+ if (hashTable != null) {
+ try {
+ hashTable.close();
+ } catch (Throwable t) {
+ log.error("Error closing the solution set hash table after unsuccessful creation.", t);
+ }
+ }
+ if (memSegments != null) {
+ try {
+ getMemoryManager().release(memSegments);
+ } catch (Throwable t) {
+ log.error("Error freeing memory after error during solution set hash table creation.", t);
+ }
+ }
+ }
+ }
+ }
+
+ private <BT> JoinHashMap<BT> initJoinHashMap() {
+ TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer
+ (getUserCodeClassLoader());
+ TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator
+ (getUserCodeClassLoader());
+
+ TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
+ TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
+
+ return new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator);
+ }
+
+ private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
+ solutionSet.open();
+ solutionSet.buildTableWithUniqueKey(solutionSetInput);
+ }
+
+ private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
+ TypeSerializer<X> serializer = solutionTypeSerializer.getSerializer();
+
+ X next;
+ while ((next = solutionSetInput.next(serializer.createInstance())) != null) {
+ solutionSet.insertOrReplace(next);
+ }
+ }
+
+ private SuperstepBarrier initSuperstepBarrier() {
+ SuperstepBarrier barrier = new SuperstepBarrier(getUserCodeClassLoader());
+ this.toSync.subscribeToEvent(barrier, AllWorkersDoneEvent.class);
+ this.toSync.subscribeToEvent(barrier, TerminationEvent.class);
+ return barrier;
+ }
+
+ @Override
+ public void run() throws Exception {
+ final String brokerKey = brokerKey();
+ final int workerIndex = getEnvironment().getIndexInSubtaskGroup();
+
+ final boolean objectSolutionSet = config.isSolutionSetUnmanaged();
+
+ CompactingHashTable<X> solutionSet = null; // if workset iteration
+ JoinHashMap<X> solutionSetObjectMap = null; // if workset iteration with unmanaged solution set
+
+ boolean waitForSolutionSetUpdate = config.getWaitForSolutionSetUpdate();
+ boolean isWorksetIteration = config.getIsWorksetIteration();
+
+ try {
+ /* used for receiving the current iteration result from iteration tail */
+ SuperstepKickoffLatch nextStepKickoff = new SuperstepKickoffLatch();
+ SuperstepKickoffLatchBroker.instance().handIn(brokerKey, nextStepKickoff);
+
+ BlockingBackChannel backChannel = initBackChannel();
+ SuperstepBarrier barrier = initSuperstepBarrier();
+ SolutionSetUpdateBarrier solutionSetUpdateBarrier = null;
+
+ feedbackDataInput = config.getIterationHeadPartialSolutionOrWorksetInputIndex();
+ feedbackTypeSerializer = this.getInputSerializer(feedbackDataInput);
+ excludeFromReset(feedbackDataInput);
+
+ int initialSolutionSetInput;
+ if (isWorksetIteration) {
+ initialSolutionSetInput = config.getIterationHeadSolutionSetInputIndex();
+ solutionTypeSerializer = config.getSolutionSetSerializer(getUserCodeClassLoader());
+
+ // setup the index for the solution set
+ @SuppressWarnings("unchecked")
+ MutableObjectIterator<X> solutionSetInput = (MutableObjectIterator<X>) createInputIterator(inputReaders[initialSolutionSetInput], solutionTypeSerializer);
+
+ // read the initial solution set
+ if (objectSolutionSet) {
+ solutionSetObjectMap = initJoinHashMap();
+ readInitialSolutionSet(solutionSetObjectMap, solutionSetInput);
+ SolutionSetBroker.instance().handIn(brokerKey, solutionSetObjectMap);
+ } else {
+ solutionSet = initCompactingHashTable();
+ readInitialSolutionSet(solutionSet, solutionSetInput);
+ SolutionSetBroker.instance().handIn(brokerKey, solutionSet);
+ }
+
+ if (waitForSolutionSetUpdate) {
+ solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
+ SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier);
+ }
+ }
+ else {
+ // bulk iteration case
+ @SuppressWarnings("unchecked")
+ TypeSerializerFactory<X> solSer = (TypeSerializerFactory<X>) feedbackTypeSerializer;
+ solutionTypeSerializer = solSer;
+
+ // = termination Criterion tail
+ if (waitForSolutionSetUpdate) {
+ solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
+ SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier);
+ }
+ }
+
+ // instantiate all aggregators and register them at the iteration global registry
+ RuntimeAggregatorRegistry aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators
+ (getUserCodeClassLoader()));
+ IterationAggregatorBroker.instance().handIn(brokerKey, aggregatorRegistry);
+
+ DataInputView superstepResult = null;
+
+ while (this.running && !terminationRequested()) {
+
+ if (log.isInfoEnabled()) {
+ log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
+ }
+
+ barrier.setup();
+
+ if (waitForSolutionSetUpdate) {
+ solutionSetUpdateBarrier.setup();
+ }
+
+ if (!inFirstIteration()) {
+ feedBackSuperstepResult(superstepResult);
+ }
+
+ super.run();
+
+ // signal to connected tasks that we are done with the superstep
+ sendEndOfSuperstepToAllIterationOutputs();
+
+ if (waitForSolutionSetUpdate) {
+ solutionSetUpdateBarrier.waitForSolutionSetUpdate();
+ }
+
+ // blocking call to wait for the result
+ superstepResult = backChannel.getReadEndAfterSuperstepEnded();
+ if (log.isInfoEnabled()) {
+ log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
+ }
+
+ sendEventToSync(new WorkerDoneEvent(workerIndex, aggregatorRegistry.getAllAggregators()));
+
+ if (log.isInfoEnabled()) {
+ log.info(formatLogString("waiting for other workers in iteration [" + currentIteration() + "]"));
+ }
+
+ barrier.waitForOtherWorkers();
+
+ if (barrier.terminationSignaled()) {
+ if (log.isInfoEnabled()) {
+ log.info(formatLogString("head received termination request in iteration ["
+ + currentIteration()
+ + "]"));
+ }
+ requestTermination();
+ nextStepKickoff.signalTermination();
+ } else {
+ incrementIterationCounter();
+
+ String[] globalAggregateNames = barrier.getAggregatorNames();
+ Value[] globalAggregates = barrier.getAggregates();
+ aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
+
+ nextStepKickoff.triggerNextSuperstep();
+ }
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info(formatLogString("streaming out final result after [" + currentIteration() + "] iterations"));
+ }
+
+ if (isWorksetIteration) {
+ if (objectSolutionSet) {
+ streamSolutionSetToFinalOutput(solutionSetObjectMap);
+ } else {
+ streamSolutionSetToFinalOutput(solutionSet);
+ }
+ } else {
+ streamOutFinalOutputBulk(new InputViewIterator<X>(superstepResult, this.solutionTypeSerializer.getSerializer()));
+ }
+
+ this.finalOutputCollector.close();
+
+ } finally {
+ // make sure we unregister everything from the broker:
+ // - backchannel
+ // - aggregator registry
+ // - solution set index
+ IterationAggregatorBroker.instance().remove(brokerKey);
+ BlockingBackChannelBroker.instance().remove(brokerKey);
+ SuperstepKickoffLatchBroker.instance().remove(brokerKey);
+ SolutionSetBroker.instance().remove(brokerKey);
+ SolutionSetUpdateBarrierBroker.instance().remove(brokerKey);
+
+ if (solutionSet != null) {
+ solutionSet.close();
+ }
+ }
+ }
+
+ private void streamOutFinalOutputBulk(MutableObjectIterator<X> results) throws IOException {
+ final Collector<X> out = this.finalOutputCollector;
+ X record = this.solutionTypeSerializer.getSerializer().createInstance();
+
+ while ((record = results.next(record)) != null) {
+ out.collect(record);
+ }
+ }
+
+ private void streamSolutionSetToFinalOutput(CompactingHashTable<X> hashTable) throws IOException {
+ final MutableObjectIterator<X> results = hashTable.getEntryIterator();
+ final Collector<X> output = this.finalOutputCollector;
+ X record = solutionTypeSerializer.getSerializer().createInstance();
+
+ while ((record = results.next(record)) != null) {
+ output.collect(record);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void streamSolutionSetToFinalOutput(JoinHashMap<X> soluionSet) throws IOException {
+ final Collector<X> output = this.finalOutputCollector;
+ for (Object e : soluionSet.values()) {
+ output.collect((X) e);
+ }
+ }
+
+ private void feedBackSuperstepResult(DataInputView superstepResult) {
+ this.inputs[this.feedbackDataInput] =
+ new InputViewIterator<Y>(superstepResult, this.feedbackTypeSerializer.getSerializer());
+ }
+
+ private void sendEndOfSuperstepToAllIterationOutputs() throws IOException, InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug(formatLogString("Sending end-of-superstep to all iteration outputs."));
+ }
+
+ for (RecordWriter<?> eventualOutput : this.eventualOutputs) {
+ eventualOutput.sendEndOfSuperstep();
+ }
+ }
+
+ private void sendEventToSync(WorkerDoneEvent event) throws IOException, InterruptedException {
+ if (log.isInfoEnabled()) {
+ log.info(formatLogString("sending " + WorkerDoneEvent.class.getSimpleName() + " to sync"));
+ }
+ this.toSync.writeEventToAllChannels(event);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
deleted file mode 100644
index e7801e4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.iterative.task;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
-import org.apache.flink.runtime.iterative.event.TerminationEvent;
-import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * An intermediate iteration task, which runs a Driver}inside.
- * <p>
- * It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to it's connected tasks. Furthermore
- * intermediate tasks can also update the iteration state, either the workset or the solution set.
- * <p>
- * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadPactTask} via
- * a {@link BlockingBackChannel} for the workset -XOR- a eHashTable for the solution set. In this case
- * this task must be scheduled on the same instance as the head.
- */
-public class IterationIntermediatePactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
-
- private static final Logger log = LoggerFactory.getLogger(IterationIntermediatePactTask.class);
-
- private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
-
- @Override
- protected void initialize() throws Exception {
- super.initialize();
-
- // set the last output collector of this task to reflect the iteration intermediate state update
- // a) workset update
- // b) solution set update
- // c) none
-
- Collector<OT> delegate = getLastOutputCollector();
- if (isWorksetUpdate) {
- // sanity check: we should not have a solution set and workset update at the same time
- // in an intermediate task
- if (isSolutionSetUpdate) {
- throw new IllegalStateException("Plan bug: Intermediate task performs workset and solutions set update.");
- }
-
- Collector<OT> outputCollector = createWorksetUpdateOutputCollector(delegate);
-
- // we need the WorksetUpdateOutputCollector separately to count the collected elements
- if (isWorksetIteration) {
- worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector;
- }
-
- setLastOutputCollector(outputCollector);
- } else if (isSolutionSetUpdate) {
- setLastOutputCollector(createSolutionSetUpdateOutputCollector(delegate));
- }
- }
-
- @Override
- public void run() throws Exception {
-
- SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
-
- while (this.running && !terminationRequested()) {
-
- if (log.isInfoEnabled()) {
- log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
- }
-
- super.run();
-
- // check if termination was requested
- verifyEndOfSuperstepState();
-
- if (isWorksetUpdate && isWorksetIteration) {
- long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
- worksetAggregator.aggregate(numCollected);
- }
-
- if (log.isInfoEnabled()) {
- log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
- }
-
- // let the successors know that the end of this superstep data is reached
- sendEndOfSuperstep();
-
- if (isWorksetUpdate) {
- // notify iteration head if responsible for workset update
- worksetBackChannel.notifyOfEndOfSuperstep();
- }
-
- boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
-
- if (terminated) {
- requestTermination();
- }
- else {
- incrementIterationCounter();
- }
- }
- }
-
- private void sendEndOfSuperstep() throws IOException, InterruptedException {
- for (RecordWriter eventualOutput : this.eventualOutputs) {
- eventualOutput.sendEndOfSuperstep();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
new file mode 100644
index 0000000..60f0dcf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.task;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
+import org.apache.flink.runtime.iterative.event.TerminationEvent;
+import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * An intermediate iteration task, which runs a Driver}inside.
+ * <p>
+ * It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to it's connected tasks. Furthermore
+ * intermediate tasks can also update the iteration state, either the workset or the solution set.
+ * <p>
+ * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
+ * a {@link BlockingBackChannel} for the workset -XOR- a eHashTable for the solution set. In this case
+ * this task must be scheduled on the same instance as the head.
+ */
+public class IterationIntermediateTask<S extends Function, OT> extends AbstractIterativeTask<S, OT> {
+
+ private static final Logger log = LoggerFactory.getLogger(IterationIntermediateTask.class);
+
+ private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
+
+ @Override
+ protected void initialize() throws Exception {
+ super.initialize();
+
+ // set the last output collector of this task to reflect the iteration intermediate state update
+ // a) workset update
+ // b) solution set update
+ // c) none
+
+ Collector<OT> delegate = getLastOutputCollector();
+ if (isWorksetUpdate) {
+ // sanity check: we should not have a solution set and workset update at the same time
+ // in an intermediate task
+ if (isSolutionSetUpdate) {
+ throw new IllegalStateException("Plan bug: Intermediate task performs workset and solutions set update.");
+ }
+
+ Collector<OT> outputCollector = createWorksetUpdateOutputCollector(delegate);
+
+ // we need the WorksetUpdateOutputCollector separately to count the collected elements
+ if (isWorksetIteration) {
+ worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector;
+ }
+
+ setLastOutputCollector(outputCollector);
+ } else if (isSolutionSetUpdate) {
+ setLastOutputCollector(createSolutionSetUpdateOutputCollector(delegate));
+ }
+ }
+
+ @Override
+ public void run() throws Exception {
+
+ SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
+
+ while (this.running && !terminationRequested()) {
+
+ if (log.isInfoEnabled()) {
+ log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
+ }
+
+ super.run();
+
+ // check if termination was requested
+ verifyEndOfSuperstepState();
+
+ if (isWorksetUpdate && isWorksetIteration) {
+ long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
+ worksetAggregator.aggregate(numCollected);
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
+ }
+
+ // let the successors know that the end of this superstep data is reached
+ sendEndOfSuperstep();
+
+ if (isWorksetUpdate) {
+ // notify iteration head if responsible for workset update
+ worksetBackChannel.notifyOfEndOfSuperstep();
+ }
+
+ boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+
+ if (terminated) {
+ requestTermination();
+ }
+ else {
+ incrementIterationCounter();
+ }
+ }
+ }
+
+ private void sendEndOfSuperstep() throws IOException, InterruptedException {
+ for (RecordWriter eventualOutput : this.eventualOutputs) {
+ eventualOutput.sendEndOfSuperstep();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index fed0a17..a85e662 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.types.IntValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +37,6 @@ import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.types.Value;
@@ -204,7 +204,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
}
private String formatLogString(String message) {
- return RegularPactTask.constructLogString(message, getEnvironment().getTaskName(), this);
+ return BatchTask.constructLogString(message, getEnvironment().getTaskName(), this);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
deleted file mode 100644
index 159d3f2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.iterative.task;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
-import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
-import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
-import org.apache.flink.util.Collector;
-
-/**
- * An iteration tail, which runs a driver inside.
- * <p>
- * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadPactTask} via
- * a BackChannel for the workset -OR- a HashTable for the solution set. Therefore this
- * task must be scheduled on the same instance as the head. It's also possible for the tail to update *both* the workset
- * and the solution set.
- * <p>
- * If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
- */
-public class IterationTailPactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
-
- private static final Logger log = LoggerFactory.getLogger(IterationTailPactTask.class);
-
- private SolutionSetUpdateBarrier solutionSetUpdateBarrier;
-
- private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
-
-
- @Override
- protected void initialize() throws Exception {
- super.initialize();
-
- // sanity check: the tail has to update either the workset or the solution set
- if (!isWorksetUpdate && !isSolutionSetUpdate) {
- throw new RuntimeException("The iteration tail doesn't update workset or the solution set.");
- }
-
- // set the last output collector of this task to reflect the iteration tail state update:
- // a) workset update,
- // b) solution set update, or
- // c) merged workset and solution set update
-
- Collector<OT> outputCollector = null;
- if (isWorksetUpdate) {
- outputCollector = createWorksetUpdateOutputCollector();
-
- // we need the WorksetUpdateOutputCollector separately to count the collected elements
- if (isWorksetIteration) {
- worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector;
- }
- }
-
- if (isSolutionSetUpdate) {
- if (isWorksetIteration) {
- outputCollector = createSolutionSetUpdateOutputCollector(outputCollector);
- }
- // Bulk iteration with termination criterion
- else {
- outputCollector = new Collector<OT>() {
- @Override
- public void collect(OT record) {}
- @Override
- public void close() {}
- };
- }
-
- if (!isWorksetUpdate) {
- solutionSetUpdateBarrier = SolutionSetUpdateBarrierBroker.instance().get(brokerKey());
- }
- }
-
- setLastOutputCollector(outputCollector);
- }
-
- @Override
- public void run() throws Exception {
-
- SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
-
- while (this.running && !terminationRequested()) {
-
- if (log.isInfoEnabled()) {
- log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
- }
-
- super.run();
-
- // check if termination was requested
- verifyEndOfSuperstepState();
-
- if (isWorksetUpdate && isWorksetIteration) {
- // aggregate workset update element count
- long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
- worksetAggregator.aggregate(numCollected);
-
- }
-
- if (log.isInfoEnabled()) {
- log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
- }
-
- if (isWorksetUpdate) {
- // notify iteration head if responsible for workset update
- worksetBackChannel.notifyOfEndOfSuperstep();
- } else if (isSolutionSetUpdate) {
- // notify iteration head if responsible for solution set update
- solutionSetUpdateBarrier.notifySolutionSetUpdate();
- }
-
- boolean terminate = nextSuperStepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
- if (terminate) {
- requestTermination();
- }
- else {
- incrementIterationCounter();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
new file mode 100644
index 0000000..9e0b560
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.task;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
+import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
+import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
+import org.apache.flink.util.Collector;
+
+/**
+ * An iteration tail, which runs a driver inside.
+ * <p>
+ * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
+ * a BackChannel for the workset -OR- a HashTable for the solution set. Therefore this
+ * task must be scheduled on the same instance as the head. It's also possible for the tail to update *both* the workset
+ * and the solution set.
+ * <p>
+ * If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
+ */
+public class IterationTailTask<S extends Function, OT> extends AbstractIterativeTask<S, OT> {
+
+ private static final Logger log = LoggerFactory.getLogger(IterationTailTask.class);
+
+ private SolutionSetUpdateBarrier solutionSetUpdateBarrier;
+
+ private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
+
+
+ @Override
+ protected void initialize() throws Exception {
+ super.initialize();
+
+ // sanity check: the tail has to update either the workset or the solution set
+ if (!isWorksetUpdate && !isSolutionSetUpdate) {
+ throw new RuntimeException("The iteration tail doesn't update workset or the solution set.");
+ }
+
+ // set the last output collector of this task to reflect the iteration tail state update:
+ // a) workset update,
+ // b) solution set update, or
+ // c) merged workset and solution set update
+
+ Collector<OT> outputCollector = null;
+ if (isWorksetUpdate) {
+ outputCollector = createWorksetUpdateOutputCollector();
+
+ // we need the WorksetUpdateOutputCollector separately to count the collected elements
+ if (isWorksetIteration) {
+ worksetUpdateOutputCollector = (WorksetUpdateOutputCollector<OT>) outputCollector;
+ }
+ }
+
+ if (isSolutionSetUpdate) {
+ if (isWorksetIteration) {
+ outputCollector = createSolutionSetUpdateOutputCollector(outputCollector);
+ }
+ // Bulk iteration with termination criterion
+ else {
+ outputCollector = new Collector<OT>() {
+ @Override
+ public void collect(OT record) {}
+ @Override
+ public void close() {}
+ };
+ }
+
+ if (!isWorksetUpdate) {
+ solutionSetUpdateBarrier = SolutionSetUpdateBarrierBroker.instance().get(brokerKey());
+ }
+ }
+
+ setLastOutputCollector(outputCollector);
+ }
+
+ @Override
+ public void run() throws Exception {
+
+ SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
+
+ while (this.running && !terminationRequested()) {
+
+ if (log.isInfoEnabled()) {
+ log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
+ }
+
+ super.run();
+
+ // check if termination was requested
+ verifyEndOfSuperstepState();
+
+ if (isWorksetUpdate && isWorksetIteration) {
+ // aggregate workset update element count
+ long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
+ worksetAggregator.aggregate(numCollected);
+
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
+ }
+
+ if (isWorksetUpdate) {
+ // notify iteration head if responsible for workset update
+ worksetBackChannel.notifyOfEndOfSuperstep();
+ } else if (isSolutionSetUpdate) {
+ // notify iteration head if responsible for solution set update
+ solutionSetUpdateBarrier.notifySolutionSetUpdate();
+ }
+
+ boolean terminate = nextSuperStepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+ if (terminate) {
+ requestTermination();
+ }
+ else {
+ incrementIterationCounter();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 0e0bd26..4923b3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,7 +29,7 @@ import org.slf4j.LoggerFactory;
/**
* This is the abstract base class for every task that can be executed by a TaskManager.
* Concrete tasks like the stream vertices of the batch tasks
- * (see {@link org.apache.flink.runtime.operators.RegularPactTask}) inherit from this class.
+ * (see {@link BatchTask}) inherit from this class.
*
* The TaskManager invokes the methods {@link #registerInputOutput()} and {@link #invoke()} in
* this order when executing a task. The first method is responsible for setting up input and
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
index 4096f0c..8f72754 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
-public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends JoinDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
+public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends JoinDriver<IT1, IT2, OT> implements ResettableDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator;
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
index 38c74e0..8c964d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
@@ -38,11 +38,11 @@ import org.slf4j.LoggerFactory;
*
* @see FlatJoinFunction
*/
-public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
+public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT2, OT>, OT> {
protected static final Logger LOG = LoggerFactory.getLogger(AbstractOuterJoinDriver.class);
- protected PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
+ protected TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
protected volatile JoinTaskIterator<IT1, IT2, OT> outerJoinIterator; // the iterator that does the actual outer join
protected volatile boolean running;
@@ -50,7 +50,7 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements PactDrive
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
+ public void setup(TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) {
this.taskContext = context;
this.running = true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
index 7b279ee..0c8dc34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
@@ -37,18 +37,18 @@ import org.slf4j.LoggerFactory;
*
* @see GroupCombineFunction
*/
-public class AllGroupCombineDriver<IN, OUT> implements PactDriver<GroupCombineFunction<IN, OUT>, OUT> {
+public class AllGroupCombineDriver<IN, OUT> implements Driver<GroupCombineFunction<IN, OUT>, OUT> {
private static final Logger LOG = LoggerFactory.getLogger(AllGroupCombineDriver.class);
- private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
+ private TaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
private boolean objectReuseEnabled = false;
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> context) {
+ public void setup(TaskContext<GroupCombineFunction<IN, OUT>, OUT> context) {
this.taskContext = context;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
index a20fddf..255c57c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
@@ -45,11 +45,11 @@ import org.apache.flink.util.MutableObjectIterator;
*
* @see org.apache.flink.api.common.functions.GroupReduceFunction
*/
-public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction<IT, OT>, OT> {
+public class AllGroupReduceDriver<IT, OT> implements Driver<GroupReduceFunction<IT, OT>, OT> {
private static final Logger LOG = LoggerFactory.getLogger(AllGroupReduceDriver.class);
- private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
+ private TaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
private MutableObjectIterator<IT> input;
@@ -62,7 +62,7 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<GroupReduceFunction<IT, OT>, OT> context) {
+ public void setup(TaskContext<GroupReduceFunction<IT, OT>, OT> context) {
this.taskContext = context;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index 1f58c1b..f27ae34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -38,11 +38,11 @@ import org.apache.flink.util.MutableObjectIterator;
*
* @see org.apache.flink.api.common.functions.ReduceFunction
*/
-public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
+public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
private static final Logger LOG = LoggerFactory.getLogger(AllReduceDriver.class);
- private PactTaskContext<ReduceFunction<T>, T> taskContext;
+ private TaskContext<ReduceFunction<T>, T> taskContext;
private MutableObjectIterator<T> input;
@@ -55,7 +55,7 @@ public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
// ------------------------------------------------------------------------
@Override
- public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
+ public void setup(TaskContext<ReduceFunction<T>, T> context) {
this.taskContext = context;
this.running = true;
}