You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/04 12:35:25 UTC
[2/7] flink git commit: [FLINK-6880] [runtime] Activate checkstyle
for runtime/iterative
[FLINK-6880] [runtime] Activate checkstyle for runtime/iterative
This closes #4098.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/834c5277
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/834c5277
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/834c5277
Branch: refs/heads/master
Commit: 834c527783346f777bf5def4c61f1791b2a89473
Parents: 343a804
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jun 9 12:53:41 2017 -0400
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 11:37:12 2017 +0200
----------------------------------------------------------------------
flink-runtime/pom.xml | 1 -
.../concurrent/BlockingBackChannel.java | 13 ++-
.../concurrent/BlockingBackChannelBroker.java | 8 +-
.../runtime/iterative/concurrent/Broker.java | 17 ++--
.../concurrent/IterationAggregatorBroker.java | 11 ++-
.../iterative/concurrent/SolutionSetBroker.java | 5 +-
.../SolutionSetUpdateBarrierBroker.java | 5 +-
.../iterative/concurrent/SuperstepBarrier.java | 22 +++--
.../concurrent/SuperstepKickoffLatch.java | 15 ++--
.../concurrent/SuperstepKickoffLatchBroker.java | 7 +-
.../WorksetEmptyConvergenceCriterion.java | 10 +--
.../iterative/event/AllWorkersDoneEvent.java | 11 ++-
.../event/IterationEventWithAggregators.java | 49 ++++++------
.../iterative/event/TerminationEvent.java | 9 +--
.../iterative/event/WorkerDoneEvent.java | 23 +++---
.../iterative/io/HashPartitionIterator.java | 12 +--
.../iterative/io/SerializedUpdateBuffer.java | 21 ++---
.../SolutionSetFastUpdateOutputCollector.java | 12 +--
...SolutionSetObjectsUpdateOutputCollector.java | 14 ++--
.../io/SolutionSetUpdateOutputCollector.java | 14 ++--
.../io/WorksetUpdateOutputCollector.java | 9 +--
.../iterative/task/AbstractIterativeTask.java | 56 ++++++-------
.../iterative/task/IterationHeadTask.java | 66 +++++++--------
.../task/IterationIntermediateTask.java | 23 +++---
.../task/IterationSynchronizationSinkTask.java | 52 ++++++------
.../iterative/task/IterationTailTask.java | 21 ++---
.../task/RuntimeAggregatorRegistry.java | 30 +++----
.../iterative/task/SyncEventHandler.java | 25 +++---
.../runtime/iterative/task/Terminable.java | 3 +-
.../concurrent/BlockingBackChannelTest.java | 21 ++---
.../iterative/concurrent/BrokerTest.java | 8 +-
.../iterative/concurrent/StringPair.java | 1 -
.../concurrent/SuperstepBarrierTest.java | 15 ++--
.../concurrent/SuperstepKickoffLatchTest.java | 84 ++++++++++----------
.../event/EventWithAggregatorsTest.java | 59 +++++++-------
35 files changed, 383 insertions(+), 369 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 654227a..df905dc 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -441,7 +441,6 @@ under the License.
**/runtime/highavailability/**,
**/runtime/instance/**,
**/runtime/io/**,
- **/runtime/iterative/**,
**/runtime/jobgraph/**,
**/runtime/jobmanager/**,
**/runtime/jobmaster/**,
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
index 067bbfe..1373654 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
@@ -16,27 +16,26 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
-import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
/**
* A concurrent datastructure that establishes a backchannel buffer between an iteration head
* and an iteration tail.
*/
public class BlockingBackChannel {
- /** buffer to send back the superstep results */
+ /** Buffer to send back the superstep results. */
private final SerializedUpdateBuffer buffer;
- /** a one element queue used for blocking hand over of the buffer */
+ /** A one element queue used for blocking hand over of the buffer. */
private final BlockingQueue<SerializedUpdateBuffer> queue;
public BlockingBackChannel(SerializedUpdateBuffer buffer) {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
index daa3ec3..120ecf1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
@@ -16,23 +16,19 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
/**
- * Singleton class for the threadsafe handover of {@link BlockingBackChannel}s from iteration heads to iteration tails
+ * Singleton class for the threadsafe handover of {@link BlockingBackChannel}s from iteration heads to iteration tails.
*/
public class BlockingBackChannelBroker extends Broker<BlockingBackChannel> {
- /**
- * Singleton instance
- */
private static final BlockingBackChannelBroker INSTANCE = new BlockingBackChannelBroker();
private BlockingBackChannelBroker() {}
/**
- * retrieve singleton instance
+ * Retrieve singleton instance.
*/
public static Broker<BlockingBackChannel> instance() {
return INSTANCE;
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
index 444d21f..6816af4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
import java.util.concurrent.ArrayBlockingQueue;
@@ -25,14 +24,14 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
- * A concurrent data structure that allows the hand-over of an object between a pair of threads
+ * A concurrent data structure that allows the hand-over of an object between a pair of threads.
*/
public class Broker<V> {
private final ConcurrentMap<String, BlockingQueue<V>> mediations = new ConcurrentHashMap<String, BlockingQueue<V>>();
/**
- * hand in the object to share
+ * Hand in the object to share.
*/
public void handIn(String key, V obj) {
if (!retrieveSharedQueue(key).offer(obj)) {
@@ -40,7 +39,7 @@ public class Broker<V> {
}
}
- /** blocking retrieval and removal of the object to share */
+ /** Blocking retrieval and removal of the object to share. */
public V getAndRemove(String key) {
try {
V objToShare = retrieveSharedQueue(key).take();
@@ -50,13 +49,13 @@ public class Broker<V> {
throw new RuntimeException(e);
}
}
-
- /** blocking retrieval and removal of the object to share */
+
+ /** Blocking retrieval and removal of the object to share. */
public void remove(String key) {
mediations.remove(key);
}
-
- /** blocking retrieval and removal of the object to share */
+
+ /** Blocking retrieval and removal of the object to share. */
public V get(String key) {
try {
BlockingQueue<V> queue = retrieveSharedQueue(key);
@@ -71,7 +70,7 @@ public class Broker<V> {
}
/**
- * thread-safe call to get a shared {@link BlockingQueue}
+ * Thread-safe call to get a shared {@link BlockingQueue}.
*/
private BlockingQueue<V> retrieveSharedQueue(String key) {
BlockingQueue<V> queue = mediations.get(key);
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
index bf509f6..249a492 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
@@ -16,17 +16,20 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
import org.apache.flink.runtime.iterative.task.RuntimeAggregatorRegistry;
+/**
+ * {@link Broker} for {@link RuntimeAggregatorRegistry}.
+ */
public class IterationAggregatorBroker extends Broker<RuntimeAggregatorRegistry> {
-
- /** single instance */
+
private static final IterationAggregatorBroker INSTANCE = new IterationAggregatorBroker();
- /** retrieve singleton instance */
+ /**
+ * Retrieve singleton instance.
+ */
public static IterationAggregatorBroker instance() {
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
index 3d9d0ea..f9ba0c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
@@ -23,9 +23,6 @@ package org.apache.flink.runtime.iterative.concurrent;
*/
public class SolutionSetBroker extends Broker<Object> {
- /**
- * Singleton instance
- */
private static final SolutionSetBroker INSTANCE = new SolutionSetBroker();
/**
@@ -34,6 +31,6 @@ public class SolutionSetBroker extends Broker<Object> {
public static Broker<Object> instance() {
return INSTANCE;
}
-
+
private SolutionSetBroker() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
index 352a262..9a1e40a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
@@ -22,15 +22,12 @@ import org.apache.flink.runtime.iterative.task.IterationHeadTask;
import org.apache.flink.runtime.iterative.task.IterationTailTask;
/**
- * Broker to hand over {@link SolutionSetUpdateBarrier} from
+ * Broker to hand over {@link SolutionSetUpdateBarrier} from
* {@link IterationHeadTask} to
* {@link IterationTailTask}.
*/
public class SolutionSetUpdateBarrierBroker extends Broker<SolutionSetUpdateBarrier> {
- /**
- * Singleton instance
- */
private static final SolutionSetUpdateBarrierBroker INSTANCE = new SolutionSetUpdateBarrierBroker();
private SolutionSetUpdateBarrierBroker() {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
index cc5d3c5..54bb870 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
@@ -16,22 +16,21 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
-import java.util.concurrent.CountDownLatch;
-
import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
+import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.types.Value;
+import java.util.concurrent.CountDownLatch;
+
/**
* A resettable one-shot latch.
*/
public class SuperstepBarrier implements EventListener<TaskEvent> {
-
+
private final ClassLoader userCodeClassLoader;
private boolean terminationSignaled = false;
@@ -40,19 +39,17 @@ public class SuperstepBarrier implements EventListener<TaskEvent> {
private String[] aggregatorNames;
private Value[] aggregates;
-
-
+
public SuperstepBarrier(ClassLoader userCodeClassLoader) {
this.userCodeClassLoader = userCodeClassLoader;
}
-
- /** setup the barrier, has to be called at the beginning of each superstep */
+ /** Setup the barrier, has to be called at the beginning of each superstep. */
public void setup() {
latch = new CountDownLatch(1);
}
- /** wait on the barrier */
+ /** Wait on the barrier. */
public void waitForOtherWorkers() throws InterruptedException {
latch.await();
}
@@ -60,13 +57,12 @@ public class SuperstepBarrier implements EventListener<TaskEvent> {
public String[] getAggregatorNames() {
return aggregatorNames;
}
-
+
public Value[] getAggregates() {
return aggregates;
}
- /** barrier will release the waiting thread if an event occurs
- * @param event*/
+ /** Barrier will release the waiting thread if an event occurs. */
@Override
public void onEvent(TaskEvent event) {
if (event instanceof TerminationEvent) {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
index 83b7a4a..b86ade2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
@@ -18,14 +18,17 @@
package org.apache.flink.runtime.iterative.concurrent;
+/**
+ * Latch used to wait for the previous superstep to complete.
+ */
public class SuperstepKickoffLatch {
-
+
private final Object monitor = new Object();
-
+
private int superstepNumber = 1;
-
+
private boolean terminated;
-
+
public void triggerNextSuperstep() {
synchronized (monitor) {
if (terminated) {
@@ -35,14 +38,14 @@ public class SuperstepKickoffLatch {
monitor.notifyAll();
}
}
-
+
public void signalTermination() {
synchronized (monitor) {
terminated = true;
monitor.notifyAll();
}
}
-
+
public boolean awaitStartOfSuperstepOrTermination(int superstep) throws InterruptedException {
while (true) {
synchronized (monitor) {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
index f137680..3b545c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
@@ -18,13 +18,18 @@
package org.apache.flink.runtime.iterative.concurrent;
+/**
+ * {@link Broker} for {@link SuperstepKickoffLatch}.
+ */
public class SuperstepKickoffLatchBroker extends Broker<SuperstepKickoffLatch> {
private static final SuperstepKickoffLatchBroker INSTANCE = new SuperstepKickoffLatchBroker();
private SuperstepKickoffLatchBroker() {}
-
+ /**
+ * Retrieve the singleton instance.
+ */
public static Broker<SuperstepKickoffLatch> instance() {
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
index 2987b89..19c26bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
@@ -16,23 +16,23 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.convergence;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.types.LongValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
- * A workset iteration is by definition converged if no records have been updated in the solutionset
+ * A workset iteration is by definition converged if no records have been updated in the solutionset.
*/
public class WorksetEmptyConvergenceCriterion implements ConvergenceCriterion<LongValue> {
private static final long serialVersionUID = 1L;
private static final Logger log = LoggerFactory.getLogger(WorksetEmptyConvergenceCriterion.class);
-
+
public static final String AGGREGATOR_NAME = "pact.runtime.workset-empty-aggregator";
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
index 2fd0db1..e62bf78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
@@ -16,19 +16,22 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.event;
-import java.util.Map;
-
import org.apache.flink.api.common.aggregators.Aggregator;
+import java.util.Map;
+
+/**
+ * Event sent by the {@code IterationSynchronizationSinkTask} to each
+ * {@code IterationHead} signaling to start a new superstep.
+ */
public class AllWorkersDoneEvent extends IterationEventWithAggregators {
public AllWorkersDoneEvent() {
super();
}
-
+
public AllWorkersDoneEvent(Map<String, Aggregator<?>> aggregators) {
super(aggregators);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
index 4e1c19e..06b7687 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
@@ -18,11 +18,6 @@
package org.apache.flink.runtime.iterative.event;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Map;
-
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -32,13 +27,21 @@ import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.types.Value;
import org.apache.flink.util.InstantiationUtil;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Base class for iteration {@link TaskEvent} transmitting operator aggregators.
+ */
public abstract class IterationEventWithAggregators extends TaskEvent {
-
+
protected static final String[] NO_STRINGS = new String[0];
protected static final Value[] NO_VALUES = new Value[0];
-
+
private String[] aggNames;
-
+
private String[] classNames;
private byte[][] serializedData;
@@ -53,11 +56,11 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
if (aggregatorName == null || aggregate == null) {
throw new NullPointerException();
}
-
+
this.aggNames = new String[] { aggregatorName };
this.aggregates = new Value[] { aggregate };
}
-
+
protected IterationEventWithAggregators(Map<String, Aggregator<?>> aggregators) {
int num = aggregators.size();
if (num == 0) {
@@ -66,7 +69,7 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
} else {
this.aggNames = new String[num];
this.aggregates = new Value[num];
-
+
int i = 0;
for (Map.Entry<String, Aggregator<?>> entry : aggregators.entrySet()) {
this.aggNames[i] = entry.getKey();
@@ -75,7 +78,7 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
}
}
}
-
+
public String[] getAggregatorNames() {
return this.aggNames;
}
@@ -97,21 +100,19 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
catch (ClassCastException e) {
throw new RuntimeException("User-defined aggregator class is not a value sublass.");
}
-
-
+
try (DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(
- new ByteArrayInputStream(serializedData[i])))
- {
+ new ByteArrayInputStream(serializedData[i]))) {
v.read(in);
}
catch (IOException e) {
throw new RuntimeException("Error while deserializing the user-defined aggregate class.", e);
}
-
+
aggregates[i] = v;
}
}
-
+
return this.aggregates;
}
@@ -119,15 +120,15 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
public void write(DataOutputView out) throws IOException {
int num = this.aggNames.length;
out.writeInt(num);
-
+
ByteArrayOutputStream boas = new ByteArrayOutputStream();
DataOutputViewStreamWrapper bufferStream = new DataOutputViewStreamWrapper(boas);
-
+
for (int i = 0; i < num; i++) {
// aggregator name and type
out.writeUTF(this.aggNames[i]);
out.writeUTF(this.aggregates[i].getClass().getName());
-
+
// aggregator value indirect as a byte array
this.aggregates[i].write(bufferStream);
bufferStream.flush();
@@ -160,14 +161,14 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
for (int i = 0; i < num; i++) {
this.aggNames[i] = in.readUTF();
this.classNames[i] = in.readUTF();
-
+
int len = in.readInt();
byte[] data = new byte[len];
this.serializedData[i] = data;
in.readFully(data);
-
+
}
-
+
this.aggregates = null;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
index 28181e8..f1523df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
@@ -16,22 +16,21 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.event;
-import java.io.IOException;
-
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.event.TaskEvent;
+import java.io.IOException;
+
/**
- * Signals that the iteration is completely executed, participating tasks must terminate now
+ * Signals that the iteration is completely executed, participating tasks must terminate now.
*/
public class TerminationEvent extends TaskEvent {
public static final TerminationEvent INSTANCE = new TerminationEvent();
-
+
@Override
public void write(DataOutputView out) throws IOException {}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
index 2e348e9..ce24f23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
@@ -16,21 +16,24 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.event;
-import java.io.IOException;
-import java.util.Map;
-
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.Value;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Completion event sent from each {@code IterationHead} to the
+ * {@code IterationSynchronizationSinkTask}.
+ */
public class WorkerDoneEvent extends IterationEventWithAggregators {
-
+
private int workerIndex;
-
+
public WorkerDoneEvent() {
super();
}
@@ -39,22 +42,22 @@ public class WorkerDoneEvent extends IterationEventWithAggregators {
super(aggregatorName, aggregate);
this.workerIndex = workerIndex;
}
-
+
public WorkerDoneEvent(int workerIndex, Map<String, Aggregator<?>> aggregators) {
super(aggregators);
this.workerIndex = workerIndex;
}
-
+
public int getWorkerIndex() {
return workerIndex;
}
-
+
@Override
public void write(DataOutputView out) throws IOException {
out.writeInt(this.workerIndex);
super.write(out);
}
-
+
@Override
public void read(DataInputView in) throws IOException {
this.workerIndex = in.readInt();
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
index 93ae55f..5bd3131 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
@@ -18,17 +18,17 @@
package org.apache.flink.runtime.iterative.io;
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Iterator;
-
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.hash.HashPartition;
import org.apache.flink.util.MutableObjectIterator;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Iterator;
+
/**
- * {@link Iterator} over the build side entries of a {@link HashPartition}
- *
+ * {@link Iterator} over the build side entries of a {@link HashPartition}.
+ *
* @param <BT>
*/
public class HashPartitionIterator<BT, PT> implements MutableObjectIterator<BT> {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
index 7776894..353dcca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
@@ -16,9 +16,16 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.io;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.AbstractPagedInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayDeque;
@@ -28,14 +35,10 @@ import java.util.Deque;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
-
+/**
+ * {@link AbstractPagedOutputView} used by the {@code BlockingBackChannel} for
+ * transmitting superstep results.
+ */
public class SerializedUpdateBuffer extends AbstractPagedOutputView {
private static final int HEADER_LENGTH = 4;
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
index f326d89..10962d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
@@ -18,17 +18,17 @@
package org.apache.flink.runtime.iterative.io;
-import java.io.IOException;
-
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.util.Collector;
+import java.io.IOException;
+
/**
* A {@link Collector} to update the solution set of a workset iteration.
- * <p>
- * The records are written to a HashTable hash table to allow in-memory point updates.
- * <p>
- * Assumption for fast updates: the build side iterator of the hash table is already positioned for the update. This
+ *
+ * <p>The records are written to a hash table to allow in-memory point updates.
+ *
+ * <p>Assumption for fast updates: the build side iterator of the hash table is already positioned for the update. This
* is for example the case when a solution set update happens directly after a solution set join. If this assumption
* doesn't hold, use {@link SolutionSetUpdateOutputCollector}, which probes the hash table before updating.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
index 21a9cc8..6a57dfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
@@ -24,12 +24,12 @@ import org.apache.flink.util.Collector;
/**
* A {@link Collector} to update the solution set of a workset iteration.
- * <p>
- * The records are written to a HashTable hash table to allow in-memory point updates.
- * <p>
- * Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
+ *
+ * <p>The records are written to a HashTable hash table to allow in-memory point updates.
+ *
+ * <p>Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
* already positioned for the update, use {@link SolutionSetFastUpdateOutputCollector} to the save re-probing.
- *
+ *
* @see SolutionSetFastUpdateOutputCollector
*/
public class SolutionSetObjectsUpdateOutputCollector<T> implements Collector<T> {
@@ -37,9 +37,9 @@ public class SolutionSetObjectsUpdateOutputCollector<T> implements Collector<T>
private final Collector<T> delegate;
private final JoinHashMap<T> hashMap;
-
+
private final TypeSerializer<T> serializer;
-
+
public SolutionSetObjectsUpdateOutputCollector(JoinHashMap<T> hashMap) {
this(hashMap, null);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
index c39efa5..90041c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
@@ -18,19 +18,19 @@
package org.apache.flink.runtime.iterative.io;
-import java.io.IOException;
-
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.util.Collector;
+import java.io.IOException;
+
/**
* A {@link Collector} to update the solution set of a workset iteration.
- * <p>
- * The records are written to a HashTable hash table to allow in-memory point updates.
- * <p>
- * Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
+ *
+ * <p>The records are written to a HashTable hash table to allow in-memory point updates.
+ *
+ * <p>Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
* already positioned for the update, use {@link SolutionSetFastUpdateOutputCollector} to the save re-probing.
- *
+ *
* @see SolutionSetFastUpdateOutputCollector
*/
public class SolutionSetUpdateOutputCollector<T> implements Collector<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
index ad1d274..329e768 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
@@ -16,19 +16,18 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.io;
-import java.io.IOException;
-
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Collector;
+import java.io.IOException;
+
/**
* A {@link Collector} to update the iteration workset (partial solution for bulk iterations).
- * <p>
- * The records are written to a {@link DataOutputView} to allow in-memory data exchange.
+ *
+ * <p>The records are written to a {@link DataOutputView} to allow in-memory data exchange.
*/
public class WorksetUpdateOutputCollector<T> implements Collector<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
index 047fd7e..bde358c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
@@ -21,11 +21,6 @@ package org.apache.flink.runtime.iterative.task;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.operators.BatchTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.functions.Function;
@@ -33,7 +28,9 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.operators.util.JoinHashMap;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
@@ -45,6 +42,7 @@ import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCri
import org.apache.flink.runtime.iterative.io.SolutionSetObjectsUpdateOutputCollector;
import org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector;
import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
+import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.ResettableDriver;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
@@ -55,6 +53,9 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
@@ -64,10 +65,9 @@ import java.util.concurrent.Future;
* The abstract base class for all tasks able to participate in an iteration.
*/
public abstract class AbstractIterativeTask<S extends Function, OT> extends BatchTask<S, OT>
- implements Terminable
-{
+ implements Terminable {
private static final Logger log = LoggerFactory.getLogger(AbstractIterativeTask.class);
-
+
protected LongSumAggregator worksetAggregator;
protected BlockingBackChannel worksetBackChannel;
@@ -77,14 +77,13 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
protected boolean isWorksetUpdate;
protected boolean isSolutionSetUpdate;
-
private RuntimeAggregatorRegistry iterationAggregators;
private String brokerKey;
private int superstepNum = 1;
-
+
private volatile boolean terminationRequested;
// --------------------------------------------------------------------------------------------
@@ -105,7 +104,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
}
}
}
-
+
TaskConfig config = getLastTasksConfig();
isWorksetIteration = config.getIsWorksetIteration();
isWorksetUpdate = config.getIsWorksetUpdate();
@@ -134,7 +133,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
} else {
reinstantiateDriver();
resetAllInputs();
-
+
// re-read the iterative broadcast variables
for (int i : this.iterativeBroadcastInputs) {
final String name = getTaskConfig().getBroadcastInputName(i);
@@ -144,7 +143,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
// call the parent to execute the superstep
super.run();
-
+
// release the iterative broadcast variables
for (int i : this.iterativeBroadcastInputs) {
final String name = getTaskConfig().getBroadcastInputName(i);
@@ -244,8 +243,9 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
@SuppressWarnings("unchecked")
MutableObjectIterator<Object> inIter = (MutableObjectIterator<Object>) this.inputIterators[inputNum];
Object o = this.inputSerializers[inputNum].getSerializer().createInstance();
- while ((o = inIter.next(o)) != null);
-
+ while ((o = inIter.next(o)) != null) {
+ }
+
if (!reader.isFinished()) {
// also reset the end-of-superstep state
reader.startNextSuperstep();
@@ -253,17 +253,17 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
}
}
}
-
+
for (int inputNum : this.iterativeBroadcastInputs) {
MutableReader<?> reader = this.broadcastInputReaders[inputNum];
if (!reader.isFinished()) {
-
+
// sanity check that the BC input is at the end of the superstep
if (!reader.hasReachedEndOfSuperstep()) {
throw new IllegalStateException("An iterative broadcast input has not been fully consumed.");
}
-
+
reader.startNextSuperstep();
}
}
@@ -291,11 +291,11 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
/**
* Creates a new {@link WorksetUpdateOutputCollector}.
- * <p>
- * This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
+ *
+ * <p>This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
* workset.
- * <p>
- * If a non-null delegate is given, the new {@link Collector} will write to the solution set and also call
+ *
+ * <p>If a non-null delegate is given, the new {@link Collector} will write to the solution set and also call
* collect(T) of the delegate.
*
* @param delegate null -OR- the delegate on which to call collect() by the newly created collector
@@ -313,13 +313,13 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
/**
* Creates a new solution set update output collector.
- * <p>
- * This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
+ *
+ * <p>This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
* solution set of workset iterations. Depending on the task configuration, either a fast (non-probing)
* {@link org.apache.flink.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or normal (re-probing)
* {@link SolutionSetUpdateOutputCollector} is created.
- * <p>
- * If a non-null delegate is given, the new {@link Collector} will write back to the solution set and also call
+ *
+ * <p>If a non-null delegate is given, the new {@link Collector} will write back to the solution set and also call
* collect(T) of the delegate.
*
* @param delegate null -OR- a delegate collector to be called by the newly created collector
@@ -328,7 +328,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
*/
protected Collector<OT> createSolutionSetUpdateOutputCollector(Collector<OT> delegate) {
Broker<Object> solutionSetBroker = SolutionSetBroker.instance();
-
+
Object ss = solutionSetBroker.get(brokerKey());
if (ss instanceof CompactingHashTable) {
@SuppressWarnings("unchecked")
@@ -363,7 +363,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
private class IterativeRuntimeUdfContext extends DistributedRuntimeUDFContext implements IterationRuntimeContext {
public IterativeRuntimeUdfContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
- Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulatorMap, MetricGroup metrics) {
+ Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulatorMap, MetricGroup metrics) {
super(taskInfo, userCodeClassLoader, executionConfig, cpTasks, accumulatorMap, metrics);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 575072d..b673ba0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -18,18 +18,6 @@
package org.apache.flink.runtime.iterative.task;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.operators.Driver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.util.JoinHashMap;
import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -39,6 +27,10 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
import org.apache.flink.runtime.iterative.concurrent.Broker;
@@ -54,12 +46,20 @@ import org.apache.flink.runtime.iterative.event.TerminationEvent;
import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* The head is responsible for coordinating an iteration and can run a
* {@link Driver} inside. It will read
@@ -71,11 +71,11 @@ import org.apache.flink.util.MutableObjectIterator;
* the second iteration, the input for the head is the output of the tail, transmitted through the backchannel. Once the
* iteration is done, the head
* will send a {@link TerminationEvent} to all it's connected tasks, signaling them to shutdown.
- * <p>
- * Assumption on the ordering of the outputs: - The first n output gates write to channels that go to the tasks of the
+ *
+ * <p>Assumption on the ordering of the outputs: - The first n output gates write to channels that go to the tasks of the
* step function. - The next m output gates to to the tasks that consume the final solution. - The last output gate
* connects to the synchronization task.
- *
+ *
* @param <X>
* The type of the bulk partial solution / solution set and the final output.
* @param <Y>
@@ -131,8 +131,8 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
}
/**
- * the iteration head prepares the backchannel: it allocates memory, instantiates a {@link BlockingBackChannel} and
- * hands it to the iteration tail via a {@link Broker} singleton
+ * The iteration head prepares the backchannel: it allocates memory, instantiates a {@link BlockingBackChannel} and
+ * hands it to the iteration tail via a {@link Broker} singleton.
**/
private BlockingBackChannel initBackChannel() throws Exception {
@@ -154,7 +154,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
return backChannel;
}
-
+
private <BT> CompactingHashTable<BT> initCompactingHashTable() throws Exception {
// get some memory
double hashjoinMemorySize = config.getRelativeSolutionSetMemory();
@@ -162,7 +162,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer(userCodeClassLoader);
TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator(userCodeClassLoader);
-
+
TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
@@ -194,27 +194,27 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
}
}
}
-
+
private <BT> JoinHashMap<BT> initJoinHashMap() {
TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer
(getUserCodeClassLoader());
TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator
(getUserCodeClassLoader());
-
+
TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
-
+
return new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator);
}
-
+
private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
solutionSet.open();
solutionSet.buildTableWithUniqueKey(solutionSetInput);
}
-
+
private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
TypeSerializer<X> serializer = solutionTypeSerializer.getSerializer();
-
+
X next;
while ((next = solutionSetInput.next(serializer.createInstance())) != null) {
solutionSet.insertOrReplace(next);
@@ -232,12 +232,12 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
public void run() throws Exception {
final String brokerKey = brokerKey();
final int workerIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
-
+
final boolean objectSolutionSet = config.isSolutionSetUnmanaged();
CompactingHashTable<X> solutionSet = null; // if workset iteration
JoinHashMap<X> solutionSetObjectMap = null; // if workset iteration with unmanaged solution set
-
+
boolean waitForSolutionSetUpdate = config.getWaitForSolutionSetUpdate();
boolean isWorksetIteration = config.getIsWorksetIteration();
@@ -245,7 +245,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
/* used for receiving the current iteration result from iteration tail */
SuperstepKickoffLatch nextStepKickoff = new SuperstepKickoffLatch();
SuperstepKickoffLatchBroker.instance().handIn(brokerKey, nextStepKickoff);
-
+
BlockingBackChannel backChannel = initBackChannel();
SuperstepBarrier barrier = initSuperstepBarrier();
SolutionSetUpdateBarrier solutionSetUpdateBarrier = null;
@@ -262,7 +262,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
// setup the index for the solution set
@SuppressWarnings("unchecked")
MutableObjectIterator<X> solutionSetInput = (MutableObjectIterator<X>) createInputIterator(inputReaders[initialSolutionSetInput], solutionTypeSerializer);
-
+
// read the initial solution set
if (objectSolutionSet) {
solutionSetObjectMap = initJoinHashMap();
@@ -284,7 +284,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
@SuppressWarnings("unchecked")
TypeSerializerFactory<X> solSer = (TypeSerializerFactory<X>) feedbackTypeSerializer;
solutionTypeSerializer = solSer;
-
+
// = termination Criterion tail
if (waitForSolutionSetUpdate) {
solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
@@ -352,7 +352,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
String[] globalAggregateNames = barrier.getAggregatorNames();
Value[] globalAggregates = barrier.getAggregates();
aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
-
+
nextStepKickoff.triggerNextSuperstep();
}
}
@@ -398,7 +398,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
out.collect(record);
}
}
-
+
private void streamSolutionSetToFinalOutput(CompactingHashTable<X> hashTable) throws IOException {
final MutableObjectIterator<X> results = hashTable.getEntryIterator();
final Collector<X> output = this.finalOutputCollector;
@@ -408,7 +408,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
output.collect(record);
}
}
-
+
@SuppressWarnings("unchecked")
private void streamSolutionSetToFinalOutput(JoinHashMap<X> soluionSet) throws IOException {
final Collector<X> output = this.finalOutputCollector;
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
index 16a7008..c5fd133 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
@@ -19,14 +19,15 @@
package org.apache.flink.runtime.iterative.task;
import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
import org.apache.flink.util.Collector;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,11 +35,11 @@ import java.io.IOException;
/**
* An intermediate iteration task, which runs a {@link org.apache.flink.runtime.operators.Driver} inside.
- * <p>
- * It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to its connected tasks. Furthermore
+ *
+ * <p>It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to its connected tasks. Furthermore
* intermediate tasks can also update the iteration state, either the workset or the solution set.
- * <p>
- * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
+ *
+ * <p>If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
* a {@link BlockingBackChannel} for the workset -XOR- a HashTable for the solution set. In this case
* this task must be scheduled on the same instance as the head.
*/
@@ -64,7 +65,7 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
if (isSolutionSetUpdate) {
throw new IllegalStateException("Plan bug: Intermediate task performs workset and solutions set update.");
}
-
+
Collector<OT> outputCollector = createWorksetUpdateOutputCollector(delegate);
// we need the WorksetUpdateOutputCollector separately to count the collected elements
@@ -80,7 +81,7 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
@Override
public void run() throws Exception {
-
+
SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
while (this.running && !terminationRequested()) {
@@ -98,19 +99,19 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
worksetAggregator.aggregate(numCollected);
}
-
+
if (log.isInfoEnabled()) {
log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
}
-
+
// let the successors know that the end of this superstep data is reached
sendEndOfSuperstep();
-
+
if (isWorksetUpdate) {
// notify iteration head if responsible for workset update
worksetBackChannel.notifyOfEndOfSuperstep();
}
-
+
boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
if (terminated) {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index 11a8cfa..6a38fcc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -16,31 +16,31 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.task;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.operators.BatchTask;
-import org.apache.flink.types.IntValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.AggregatorWithName;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.types.IntValue;
import org.apache.flink.types.Value;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* The task responsible for synchronizing all iteration heads, implemented as an output task. This task
* will never see any data.
@@ -52,13 +52,13 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
private static final Logger log = LoggerFactory.getLogger(IterationSynchronizationSinkTask.class);
private MutableRecordReader<IntValue> headEventReader;
-
+
private SyncEventHandler eventHandler;
private ConvergenceCriterion<Value> convergenceCriterion;
private ConvergenceCriterion<Value> implicitConvergenceCriterion;
-
+
private Map<String, Aggregator<?>> aggregators;
private String convergenceAggregatorName;
@@ -66,13 +66,13 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
private String implicitConvergenceAggregatorName;
private int currentIteration = 1;
-
+
private int maxNumberOfIterations;
private final AtomicBoolean terminated = new AtomicBoolean(false);
// --------------------------------------------------------------------------------------------
-
+
@Override
public void invoke() throws Exception {
this.headEventReader = new MutableRecordReader<>(
@@ -80,13 +80,13 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
getEnvironment().getTaskManagerInfo().getTmpDirectories());
TaskConfig taskConfig = new TaskConfig(getTaskConfiguration());
-
+
// store all aggregators
this.aggregators = new HashMap<>();
for (AggregatorWithName<?> aggWithName : taskConfig.getIterationAggregators(getUserCodeClassLoader())) {
aggregators.put(aggWithName.getName(), aggWithName.getAggregator());
}
-
+
// store the aggregator convergence criterion
if (taskConfig.usesConvergenceCriterion()) {
convergenceCriterion = taskConfig.getConvergenceCriterion(getUserCodeClassLoader());
@@ -100,9 +100,9 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
implicitConvergenceAggregatorName = taskConfig.getImplicitConvergenceCriterionAggregatorName();
Preconditions.checkNotNull(implicitConvergenceAggregatorName);
}
-
+
maxNumberOfIterations = taskConfig.getNumberOfIterations();
-
+
// set up the event handler
int numEventsTillEndOfSuperstep = taskConfig.getNumberOfEventsUntilInterruptInIterativeGate(0);
eventHandler = new SyncEventHandler(numEventsTillEndOfSuperstep, aggregators,
@@ -110,7 +110,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
headEventReader.registerTaskEventListener(eventHandler, WorkerDoneEvent.class);
IntValue dummy = new IntValue();
-
+
while (!terminationRequested()) {
if (log.isInfoEnabled()) {
@@ -140,7 +140,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
AllWorkersDoneEvent allWorkersDoneEvent = new AllWorkersDoneEvent(aggregators);
sendToAllWorkers(allWorkersDoneEvent);
-
+
// reset all aggregators
for (Aggregator<?> agg : aggregators.values()) {
agg.reset();
@@ -165,7 +165,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
if (aggregator == null) {
throw new RuntimeException("Error: Aggregator for convergence criterion was null.");
}
-
+
Value aggregate = aggregator.getAggregate();
if (convergenceCriterion.isConverged(currentIteration, aggregate)) {
@@ -194,14 +194,14 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
return true;
}
}
-
+
return false;
}
private void readHeadEventChannel(IntValue rec) throws IOException {
// reset the handler
eventHandler.resetEndOfSuperstep();
-
+
// read (and thereby process all events in the handler's event handling functions)
try {
if (this.headEventReader.next(rec)) {
@@ -222,9 +222,9 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
private String formatLogString(String message) {
return BatchTask.constructLogString(message, getEnvironment().getTaskInfo().getTaskName(), this);
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public boolean terminationRequested() {
return terminated.get();
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
index 9e0b560..3ec3a8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.iterative.task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
@@ -28,15 +26,18 @@ import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker
import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* An iteration tail, which runs a driver inside.
- * <p>
- * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
+ *
+ * <p>If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
* a BackChannel for the workset -OR- a HashTable for the solution set. Therefore this
* task must be scheduled on the same instance as the head. It's also possible for the tail to update *both* the workset
* and the solution set.
- * <p>
- * If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
+ *
+ * <p>If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
*/
public class IterationTailTask<S extends Function, OT> extends AbstractIterativeTask<S, OT> {
@@ -45,7 +46,6 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
private SolutionSetUpdateBarrier solutionSetUpdateBarrier;
private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
-
@Override
protected void initialize() throws Exception {
@@ -80,6 +80,7 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
outputCollector = new Collector<OT>() {
@Override
public void collect(OT record) {}
+
@Override
public void close() {}
};
@@ -95,9 +96,9 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
@Override
public void run() throws Exception {
-
+
SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
-
+
while (this.running && !terminationRequested()) {
if (log.isInfoEnabled()) {
@@ -119,7 +120,7 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
if (log.isInfoEnabled()) {
log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
}
-
+
if (isWorksetUpdate) {
// notify iteration head if responsible for workset update
worksetBackChannel.notifyOfEndOfSuperstep();
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
index 7beb4c7..c4a30c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
@@ -18,56 +18,56 @@
package org.apache.flink.runtime.iterative.task;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.AggregatorWithName;
import org.apache.flink.types.Value;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
/**
*
*/
public class RuntimeAggregatorRegistry {
-
+
private final Map<String, Aggregator<?>> aggregators;
-
+
private final Map<String, Value> previousGlobalAggregate;
-
+
public RuntimeAggregatorRegistry(Collection<AggregatorWithName<?>> aggs) {
this.aggregators = new HashMap<String, Aggregator<?>>();
this.previousGlobalAggregate = new HashMap<String, Value>();
-
+
for (AggregatorWithName<?> agg : aggs) {
this.aggregators.put(agg.getName(), agg.getAggregator());
}
}
-
+
public Value getPreviousGlobalAggregate(String name) {
return this.previousGlobalAggregate.get(name);
}
-
+
@SuppressWarnings("unchecked")
public <T extends Aggregator<?>> T getAggregator(String name) {
return (T) this.aggregators.get(name);
}
-
+
public Map<String, Aggregator<?>> getAllAggregators() {
return this.aggregators;
}
-
+
public void updateGlobalAggregatesAndReset(String[] names, Value[] aggregates) {
if (names == null || aggregates == null || names.length != aggregates.length) {
throw new IllegalArgumentException();
}
-
+
// add global aggregates
- for (int i = 0 ; i < names.length; i++) {
+ for (int i = 0; i < names.length; i++) {
this.previousGlobalAggregate.put(names[i], aggregates[i]);
}
-
+
// reset all aggregators
for (Aggregator<?> agg : this.aggregators.values()) {
agg.reset();
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
index 71c15b1..45843ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
@@ -18,27 +18,30 @@
package org.apache.flink.runtime.iterative.task;
-import java.util.Map;
-
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
+import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.types.Value;
import org.apache.flink.util.Preconditions;
+import java.util.Map;
+
+/**
+ * Listener for {@link WorkerDoneEvent} which also aggregates all aggregators
+ * from iteration tasks and signals the end of the superstep.
+ */
public class SyncEventHandler implements EventListener<TaskEvent> {
-
+
private final ClassLoader userCodeClassLoader;
-
+
private final Map<String, Aggregator<?>> aggregators;
private final int numberOfEventsUntilEndOfSuperstep;
private int workerDoneEventCounter;
-
- private boolean endOfSuperstep;
+ private boolean endOfSuperstep;
public SyncEventHandler(int numberOfEventsUntilEndOfSuperstep, Map<String, Aggregator<?>> aggregators, ClassLoader userCodeClassLoader) {
Preconditions.checkArgument(numberOfEventsUntilEndOfSuperstep > 0);
@@ -60,7 +63,7 @@ public class SyncEventHandler implements EventListener<TaskEvent> {
if (this.endOfSuperstep) {
throw new RuntimeException("Encountered WorderDoneEvent when still in End-of-Superstep status.");
}
-
+
workerDoneEventCounter++;
String[] aggNames = workerDoneEvent.getAggregatorNames();
@@ -69,7 +72,7 @@ public class SyncEventHandler implements EventListener<TaskEvent> {
if (aggNames.length != aggregates.length) {
throw new RuntimeException("Inconsistent WorkerDoneEvent received!");
}
-
+
for (int i = 0; i < aggNames.length; i++) {
@SuppressWarnings("unchecked")
Aggregator<Value> aggregator = (Aggregator<Value>) this.aggregators.get(aggNames[i]);
@@ -81,11 +84,11 @@ public class SyncEventHandler implements EventListener<TaskEvent> {
Thread.currentThread().interrupt();
}
}
-
+
public boolean isEndOfSuperstep() {
return this.endOfSuperstep;
}
-
+
public void resetEndOfSuperstep() {
this.endOfSuperstep = false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
index d6f7544..9d952ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
@@ -16,11 +16,10 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.task;
/**
- * Models the functionality that the termination of an iterative task can be requested from outside
+ * Models the functionality that the termination of an iterative task can be requested from outside.
*/
public interface Terminable {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
index d22a23b..c9015e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
@@ -16,25 +16,26 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
+
+import com.google.common.collect.Lists;
import org.junit.Test;
import org.mockito.Mockito;
-import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link BlockingBackChannel}.
+ */
public class BlockingBackChannelTest {
private static final int NUM_ITERATIONS = 3;
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
index 3f2c243..e12cb32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
@@ -16,14 +16,13 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
+import org.apache.flink.util.Preconditions;
+
import com.google.common.collect.Lists;
import org.junit.Test;
-import org.apache.flink.util.Preconditions;
-
import java.util.Collections;
import java.util.List;
import java.util.Random;
@@ -35,6 +34,9 @@ import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link Broker}.
+ */
public class BrokerTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
index 798d9f5..82ddac7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
class StringPair {
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
index 2f26670..1187adf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
@@ -16,19 +16,22 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.concurrent;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Random;
-
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
+
import org.junit.Test;
+import java.util.Random;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link SuperstepBarrier}.
+ */
public class SuperstepBarrierTest {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
index 8c8deb2..90555d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
@@ -21,30 +21,33 @@ package org.apache.flink.runtime.iterative.concurrent;
import org.junit.Assert;
import org.junit.Test;
+/**
+ * Tests for {@link SuperstepKickoffLatch}.
+ */
public class SuperstepKickoffLatchTest {
@Test
public void testWaitFromOne() {
try {
SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
-
+
Waiter w = new Waiter(latch, 2);
Thread waiter = new Thread(w);
waiter.setDaemon(true);
waiter.start();
-
+
WatchDog wd = new WatchDog(waiter, 2000);
wd.start();
-
+
Thread.sleep(100);
-
+
latch.triggerNextSuperstep();
-
+
wd.join();
if (wd.getError() != null) {
throw wd.getError();
}
-
+
if (w.getError() != null) {
throw w.getError();
}
@@ -54,28 +57,28 @@ public class SuperstepKickoffLatchTest {
Assert.fail("Error: " + t.getMessage());
}
}
-
+
@Test
public void testWaitAlreadyFulfilled() {
try {
SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
latch.triggerNextSuperstep();
-
+
Waiter w = new Waiter(latch, 2);
Thread waiter = new Thread(w);
waiter.setDaemon(true);
waiter.start();
-
+
WatchDog wd = new WatchDog(waiter, 2000);
wd.start();
-
+
Thread.sleep(100);
-
+
wd.join();
if (wd.getError() != null) {
throw wd.getError();
}
-
+
if (w.getError() != null) {
throw w.getError();
}
@@ -85,14 +88,14 @@ public class SuperstepKickoffLatchTest {
Assert.fail("Error: " + t.getMessage());
}
}
-
+
@Test
public void testWaitIncorrect() {
try {
SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
latch.triggerNextSuperstep();
latch.triggerNextSuperstep();
-
+
try {
latch.awaitStartOfSuperstepOrTermination(2);
Assert.fail("should throw exception");
@@ -106,29 +109,29 @@ public class SuperstepKickoffLatchTest {
Assert.fail("Error: " + e.getMessage());
}
}
-
+
@Test
public void testWaitIncorrectAsync() {
try {
SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
latch.triggerNextSuperstep();
latch.triggerNextSuperstep();
-
+
Waiter w = new Waiter(latch, 2);
Thread waiter = new Thread(w);
waiter.setDaemon(true);
waiter.start();
-
+
WatchDog wd = new WatchDog(waiter, 2000);
wd.start();
-
+
Thread.sleep(100);
-
+
wd.join();
if (wd.getError() != null) {
throw wd.getError();
}
-
+
if (w.getError() != null) {
if (!(w.getError() instanceof IllegalStateException)) {
throw new Exception("wrong exception type " + w.getError());
@@ -142,29 +145,29 @@ public class SuperstepKickoffLatchTest {
Assert.fail("Error: " + t.getMessage());
}
}
-
+
@Test
public void testWaitForTermination() {
try {
SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
latch.triggerNextSuperstep();
latch.triggerNextSuperstep();
-
+
Waiter w = new Waiter(latch, 4);
Thread waiter = new Thread(w);
waiter.setDaemon(true);
waiter.start();
-
+
WatchDog wd = new WatchDog(waiter, 2000);
wd.start();
-
+
latch.signalTermination();
-
+
wd.join();
if (wd.getError() != null) {
throw wd.getError();
}
-
+
if (w.getError() != null) {
throw w.getError();
}
@@ -174,16 +177,15 @@ public class SuperstepKickoffLatchTest {
Assert.fail("Error: " + t.getMessage());
}
}
-
+
private static class Waiter implements Runnable {
private final SuperstepKickoffLatch latch;
-
+
private final int waitFor;
-
+
private volatile Throwable error;
-
-
+
public Waiter(SuperstepKickoffLatch latch, int waitFor) {
this.latch = latch;
this.waitFor = waitFor;
@@ -198,37 +200,37 @@ public class SuperstepKickoffLatchTest {
this.error = t;
}
}
-
+
public Throwable getError() {
return error;
}
}
-
+
private static class WatchDog extends Thread {
-
+
private final Thread toWatch;
-
+
private final long timeOut;
-
+
private volatile Throwable failed;
-
+
public WatchDog(Thread toWatch, long timeout) {
setDaemon(true);
setName("Watchdog");
this.toWatch = toWatch;
this.timeOut = timeout;
}
-
+
@SuppressWarnings("deprecation")
@Override
public void run() {
try {
toWatch.join(timeOut);
-
+
if (toWatch.isAlive()) {
this.failed = new Exception("timed out");
toWatch.interrupt();
-
+
toWatch.join(2000);
if (toWatch.isAlive()) {
toWatch.stop();
@@ -239,7 +241,7 @@ public class SuperstepKickoffLatchTest {
failed = t;
}
}
-
+
public Throwable getError() {
return failed;
}