You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/23 12:58:38 UTC

[1/6] flink git commit: [FLINK-5118] [metrics] Fix inconsistent numBytesIn/Out metrics for network channels

Repository: flink
Updated Branches:
  refs/heads/release-1.2 1d6b8bbce -> 940b4900d


[FLINK-5118] [metrics] Fix inconsistent numBytesIn/Out metrics for network channels


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/940b4900
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/940b4900
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/940b4900

Branch: refs/heads/release-1.2
Commit: 940b4900d1cfc19b32af2b3e8753d3b6e01622ee
Parents: 3c943e6
Author: zentol <ch...@apache.org>
Authored: Thu Jan 12 14:59:22 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100

----------------------------------------------------------------------
 .../api/serialization/SpanningRecordSerializer.java      |  8 --------
 .../runtime/io/network/api/writer/RecordWriter.java      | 11 ++++++++---
 2 files changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/940b4900/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index a8fe3fe..57ed06f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -24,7 +24,6 @@ import java.nio.ByteOrder;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
@@ -52,8 +51,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	/** Limit of current {@link MemorySegment} of target buffer */
 	private int limit;
 
-	private transient Counter numBytesOut;
-
 	public SpanningRecordSerializer() {
 		this.serializationBuffer = new DataOutputSerializer(128);
 
@@ -81,10 +78,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 
 		int len = this.serializationBuffer.length();
 		this.lengthBuffer.putInt(0, len);
-		
-		if (numBytesOut != null) {
-			numBytesOut.inc(len);
-		}
 
 		this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
 
@@ -187,6 +180,5 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 
 	@Override
 	public void instantiateMetrics(TaskIOMetricGroup metrics) {
-		numBytesOut = metrics.getNumBytesOutCounter();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/940b4900/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 799187d..f9671ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
@@ -57,6 +59,8 @@ public class RecordWriter<T extends IOReadableWritable> {
 
 	private final Random RNG = new XORShiftRandom();
 
+	private Counter numBytesOut = new SimpleCounter();
+
 	public RecordWriter(ResultPartitionWriter writer) {
 		this(writer, new RoundRobinChannelSelector<T>());
 	}
@@ -112,6 +116,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 				Buffer buffer = serializer.getCurrentBuffer();
 
 				if (buffer != null) {
+					numBytesOut.inc(buffer.getSize());
 					writeAndClearBuffer(buffer, targetChannel, serializer);
 
 					// If this was a full record, we are done. Not breaking
@@ -137,6 +142,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 			synchronized (serializer) {
 				Buffer buffer = serializer.getCurrentBuffer();
 				if (buffer != null) {
+					numBytesOut.inc(buffer.getSize());
 					writeAndClearBuffer(buffer, targetChannel, serializer);
 				} else if (serializer.hasData()) {
 					throw new IllegalStateException("No buffer, but serializer has buffered data.");
@@ -171,6 +177,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 					Buffer buffer = serializer.getCurrentBuffer();
 
 					if (buffer != null) {
+						numBytesOut.inc(buffer.getSize());
 						writeAndClearBuffer(buffer, targetChannel, serializer);
 					}
 				} finally {
@@ -202,9 +209,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 	 * @param metrics
      */
 	public void setMetricGroup(TaskIOMetricGroup metrics) {
-		for(RecordSerializer<?> serializer : serializers) {
-			serializer.instantiateMetrics(metrics);
-		}
+		numBytesOut = metrics.getNumBytesOutCounter();
 	}
 
 	/**


[6/6] flink git commit: [hotfix] [gelly] Fix TriangleListing EdgeOrder

Posted by se...@apache.org.
[hotfix] [gelly] Fix TriangleListing EdgeOrder

The edge bitmask was swapped in ed09dba.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d3bfa2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d3bfa2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d3bfa2f

Branch: refs/heads/release-1.2
Commit: 7d3bfa2fdfd9e3c30ce57309c12d5c2cff2a0f1c
Parents: feeb9d7
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jan 20 13:33:01 2017 -0500
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100

----------------------------------------------------------------------
 .../clustering/directed/TriangleListing.java    | 24 +++++++++-----------
 1 file changed, 11 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d3bfa2f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index e1b3040..a1006c4 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -443,19 +443,17 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		}
 
 		private String maskToString(byte mask, int shift) {
-			switch((mask >>> shift) & 0b000011) {
-				case 0b01:
-					// EdgeOrder.FORWARD
-					return "->";
-				case 0b10:
-					// EdgeOrder.REVERSE
-					return "<-";
-				case 0b11:
-					// EdgeOrder.MUTUAL
-					return "<->";
-				default:
-					throw new IllegalArgumentException("Bitmask is missing an edge (mask = "
-						+ mask + ", shift = " + shift);
+			int edgeMask = (mask >>> shift) & 0b000011;
+
+			if (edgeMask == EdgeOrder.FORWARD.getBitmask()) {
+				return "->";
+			} else if (edgeMask == EdgeOrder.REVERSE.getBitmask()) {
+				return "<-";
+			} else if (edgeMask == EdgeOrder.MUTUAL.getBitmask()) {
+				return "<->";
+			} else {
+				throw new IllegalArgumentException("Bitmask is missing an edge (mask = "
+					+ mask + ", shift = " + shift);
 			}
 		}
 	}


[3/6] flink git commit: [FLINK-5532] [streaming api] Make marker window assigners for aligned window ops non-extendable

Posted by se...@apache.org.
[FLINK-5532] [streaming api] Make marker window assigners for aligned window ops non-extendable

Makes the TumblingAlignedProcessingTimeWindows and the
SlidingAlignedProcessingTimeWindows final so that users cannot
extend them.

This closes #3180


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c291fe6c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c291fe6c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c291fe6c

Branch: refs/heads/release-1.2
Commit: c291fe6c5e90ca7c6f11d2c93a4a3d370d5f8a8c
Parents: 1d6b8bb
Author: kl0u <kk...@gmail.com>
Authored: Fri Jan 20 14:31:48 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/streaming/api/datastream/WindowedStream.java    | 4 ++--
 .../windowing/assigners/SlidingAlignedProcessingTimeWindows.java | 2 +-
 .../assigners/TumblingAlignedProcessingTimeWindows.java          | 4 ++--
 3 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c291fe6c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 686e874..cd8385b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -1015,7 +1015,7 @@ public class WindowedStream<T, K, W extends Window> {
 			TypeInformation<R> resultType,
 			String functionName) {
 
-		if (windowAssigner instanceof SlidingAlignedProcessingTimeWindows && trigger == null && evictor == null) {
+		if (windowAssigner.getClass().equals(SlidingAlignedProcessingTimeWindows.class) && trigger == null && evictor == null) {
 			SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner;
 			final long windowLength = timeWindows.getSize();
 			final long windowSlide = timeWindows.getSlide();
@@ -1046,7 +1046,7 @@ public class WindowedStream<T, K, W extends Window> {
 						windowLength, windowSlide);
 				return input.transform(opName, resultType, op);
 			}
-		} else if (windowAssigner instanceof TumblingAlignedProcessingTimeWindows && trigger == null && evictor == null) {
+		} else if (windowAssigner.getClass().equals(TumblingAlignedProcessingTimeWindows.class) && trigger == null && evictor == null) {
 			TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
 			final long windowLength = timeWindows.getSize();
 			final long windowSlide = timeWindows.getSize();

http://git-wip-us.apache.org/repos/asf/flink/blob/c291fe6c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
index 743ee0b..a0e0bcb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
@@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
  * <p>
  * <b>WARNING:</b> Bear in mind that no rescaling and no backwards compatibility is supported.
  * */
-public class SlidingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
+public final class SlidingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
 
 	private static final long serialVersionUID = 3695562702662473688L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c291fe6c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
index 007fae9..e1a8101 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
@@ -35,11 +35,11 @@ import org.apache.flink.streaming.api.windowing.time.Time;
  * <p>
  * <b>WARNING:</b> Bear in mind that no rescaling and no backwards compatibility is supported.
  * */
-public class TumblingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
+public final class TumblingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
 
 	private static final long serialVersionUID = -6217477609512299842L;
 
-	protected TumblingAlignedProcessingTimeWindows(long size) {
+	public TumblingAlignedProcessingTimeWindows(long size) {
 		super(size);
 	}
 


[4/6] flink git commit: [FLINK-5532] [streaming api] Improve JavaDocs for assigners for Fast Aligned Windows to clarify role

Posted by se...@apache.org.
[FLINK-5532] [streaming api] Improve JavaDocs for assigners for Fast Aligned Windows to clarify role


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/feeb9d7b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/feeb9d7b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/feeb9d7b

Branch: refs/heads/release-1.2
Commit: feeb9d7bb7afb56749be501afb79f14617c455d7
Parents: c291fe6
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 23 11:24:01 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedStream.java          |  4 +-
 .../SlidingAlignedProcessingTimeWindows.java    | 41 +++++++++++++-------
 .../TumblingAlignedProcessingTimeWindows.java   | 37 ++++++++++++------
 3 files changed, 54 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/feeb9d7b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index cd8385b..51712e1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -1015,7 +1015,7 @@ public class WindowedStream<T, K, W extends Window> {
 			TypeInformation<R> resultType,
 			String functionName) {
 
-		if (windowAssigner.getClass().equals(SlidingAlignedProcessingTimeWindows.class) && trigger == null && evictor == null) {
+		if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
 			SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner;
 			final long windowLength = timeWindows.getSize();
 			final long windowSlide = timeWindows.getSlide();
@@ -1046,7 +1046,7 @@ public class WindowedStream<T, K, W extends Window> {
 						windowLength, windowSlide);
 				return input.transform(opName, resultType, op);
 			}
-		} else if (windowAssigner.getClass().equals(TumblingAlignedProcessingTimeWindows.class) && trigger == null && evictor == null) {
+		} else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
 			TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
 			final long windowLength = timeWindows.getSize();
 			final long windowSlide = timeWindows.getSize();

http://git-wip-us.apache.org/repos/asf/flink/blob/feeb9d7b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
index a0e0bcb..984c31a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
@@ -18,23 +18,36 @@
 
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
 /**
- * A processing time sliding {@link WindowAssigner window assigner} used to perform windowing using the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator
- * AccumulatingProcessingTimeWindowOperator} and the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator
- * AggregatingProcessingTimeWindowOperator}.
- *
- * <p>
- * With this assigner, the {@code trigger} used is a
- * {@link org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger
- * ProcessingTimeTrigger} and no {@code evictor} can be specified.
- *
- * <p>
- * <b>WARNING:</b> Bear in mind that no rescaling and no backwards compatibility is supported.
- * */
+ * This is a special window assigner used to tell the system to use the
+ * <i>"Fast Aligned Processing Time Window Operator"</i> for windowing.
+ * 
+ * <p>Prior Flink versions used that operator automatically for simple processing time
+ * windows (tumbling and sliding) when no custom trigger and no evictor was specified.
+ * In the current Flink version, that operator is only used when programs explicitly 
+ * specify this window assigner. This is only intended for special cases where programs relied on
+ * the better performance of the fast aligned window operator, and are willing to accept the lack
+ * of support for various features as indicated below:
+ * 
+ * <ul>
+ *     <li>No custom state backend can be selected, the operator always stores data on the Java heap.</li>
+ *     <li>The operator does not support key groups, meaning it cannot change the parallelism.</li>
+ *     <li>Future versions of Flink may not be able to resume from checkpoints/savepoints taken by this
+ *         operator.</li>
+ * </ul>
+ * 
+ * <p>Future implementation plans: We plan to add some of the optimizations used by this operator to
+ * the general window operator, so that future versions of Flink will not have the performance/functionality
+ * trade-off any more.
+ * 
+ * <p>Note on implementation: The concrete operator instantiated by this assigner is either the
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator}
+ * or {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator}.
+ */
+@PublicEvolving
 public final class SlidingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
 
 	private static final long serialVersionUID = 3695562702662473688L;

http://git-wip-us.apache.org/repos/asf/flink/blob/feeb9d7b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
index e1a8101..c00eb7d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
@@ -18,23 +18,36 @@
 
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
 /**
- * A processing time tumbling {@link WindowAssigner window assigner} used to perform windowing using the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator
- * AccumulatingProcessingTimeWindowOperator} and the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator
- * AggregatingProcessingTimeWindowOperator}.
+ * This is a special window assigner used to tell the system to use the
+ * <i>"Fast Aligned Processing Time Window Operator"</i> for windowing.
  *
- * <p>
- * With this assigner, the {@code trigger} used is a
- * {@link org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger
- * ProcessingTimeTrigger} and no {@code evictor} can be specified.
+ * <p>Prior Flink versions used that operator automatically for simple processing time
+ * windows (tumbling and sliding) when no custom trigger and no evictor was specified.
+ * In the current Flink version, that operator is only used when programs explicitly 
+ * specify this window assigner. This is only intended for special cases where programs relied on
+ * the better performance of the fast aligned window operator, and are willing to accept the lack
+ * of support for various features as indicated below:
  *
- * <p>
- * <b>WARNING:</b> Bear in mind that no rescaling and no backwards compatibility is supported.
- * */
+ * <ul>
+ *     <li>No custom state backend can be selected, the operator always stores data on the Java heap.</li>
+ *     <li>The operator does not support key groups, meaning it cannot change the parallelism.</li>
+ *     <li>Future versions of Flink may not be able to resume from checkpoints/savepoints taken by this
+ *         operator.</li>
+ * </ul>
+ *
+ * <p>Future implementation plans: We plan to add some of the optimizations used by this operator to
+ * the general window operator, so that future versions of Flink will not have the performance/functionality
+ * trade-off any more.
+ *
+ * <p>Note on implementation: The concrete operator instantiated by this assigner is either the
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator}
+ * or {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator}.
+ */
+@PublicEvolving
 public final class TumblingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
 
 	private static final long serialVersionUID = -6217477609512299842L;


[5/6] flink git commit: [FLINK-5562] [gelly] Driver fixes: Improve parametrization and output formatting.

Posted by se...@apache.org.
[FLINK-5562] [gelly] Driver fixes: Improve parametrization and output formatting.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c943e68
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c943e68
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c943e68

Branch: refs/heads/release-1.2
Commit: 3c943e6820525180089f3d402010e138fd9af54d
Parents: 37bffdd
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 26 15:18:50 2016 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100

----------------------------------------------------------------------
 .../graph/drivers/ClusteringCoefficient.java    |  2 +-
 .../apache/flink/graph/drivers/Graph500.java    | 37 ++++++++++++++++----
 .../flink/graph/drivers/GraphMetrics.java       | 11 +++---
 .../org/apache/flink/graph/drivers/HITS.java    | 11 +++---
 .../flink/graph/drivers/JaccardIndex.java       | 11 +++---
 .../flink/graph/drivers/TriangleListing.java    |  7 ++--
 .../flink/graph/AbstractGraphAnalytic.java      |  5 +--
 .../org/apache/flink/graph/AnalyticHelper.java  |  8 ++++-
 8 files changed, 65 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
index cd28ee4..79a17a4 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -78,7 +78,7 @@ public class ClusteringCoefficient {
 			.appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" +
 				" the vertex, and the number of edges between vertex neighbors.", 80))
 			.appendNewLine()
-			.appendln("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+			.appendln("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat> --output <print | hash | csv>")
 			.appendNewLine()
 			.appendln("options:")
 			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")

http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
index 51ef66f..a4e7c01 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
 import org.apache.flink.graph.generator.random.RandomGenerableFactory;
@@ -53,8 +52,6 @@ public class Graph500 {
 
 	private static final int DEFAULT_EDGE_FACTOR = 16;
 
-	private static final boolean DEFAULT_SIMPLIFY = false;
-
 	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
 
 	private static String getUsage(String message) {
@@ -68,6 +65,9 @@ public class Graph500 {
 			.appendNewLine()
 			.appendln("Note: this does not yet implement permutation of vertex labels or edges.")
 			.appendNewLine()
+			.appendln("usage: Graph500 --directed <true | false> --simplify <true | false> --output <print | hash | csv>")
+			.appendNewLine()
+			.appendln("options:")
 			.appendln("  --output print")
 			.appendln("  --output hash")
 			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
@@ -84,6 +84,17 @@ public class Graph500 {
 		ParameterTool parameters = ParameterTool.fromArgs(args);
 		env.getConfig().setGlobalJobParameters(parameters);
 
+		if (! parameters.has("directed")) {
+			throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
+		}
+		boolean directed = parameters.getBoolean("directed");
+
+		if (! parameters.has("simplify")) {
+			throw new ProgramParametrizationException(getUsage("must declare '--simplify true' or '--simplify false'"));
+		}
+		boolean simplify = parameters.getBoolean("simplify");
+
+
 		// Generate RMat graph
 		int scale = parameters.getInt("scale", DEFAULT_SCALE);
 		int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
@@ -93,14 +104,23 @@ public class Graph500 {
 		long vertexCount = 1L << scale;
 		long edgeCount = vertexCount * edgeFactor;
 
-		boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY);
 		boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
 
 		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
 			.generate();
 
-		if (simplify) {
-			graph = graph.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+		if (directed) {
+			if (simplify) {
+				graph = graph
+					.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+			}
+		} else {
+			if (simplify) {
+				graph = graph
+					.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+			} else {
+				graph = graph.getUndirected();
+			}
 		}
 
 		DataSet<Tuple2<LongValue,LongValue>> edges = graph
@@ -110,15 +130,17 @@ public class Graph500 {
 		// Print, hash, or write RMat graph to disk
 		switch (parameters.get("output", "")) {
 		case "print":
+			System.out.println();
 			edges.print();
 			break;
 
 		case "hash":
+			System.out.println();
 			System.out.println(DataSetUtils.checksumHashCode(edges));
 			break;
 
 		case "csv":
-			String filename = parameters.get("filename");
+			String filename = parameters.getRequired("output_filename");
 
 			String lineDelimiter = StringEscapeUtils.unescapeJava(
 				parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -137,6 +159,7 @@ public class Graph500 {
 		JobExecutionResult result = env.getLastJobExecutionResult();
 
 		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println();
 		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
index 899ae66..9b246df 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -63,7 +63,7 @@ public class GraphMetrics {
 			.appendNewLine()
 			.appendln(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80))
 			.appendNewLine()
-			.appendln("usage: GraphMetrics --directed <true | false> --input <csv | rmat [options]>")
+			.appendln("usage: GraphMetrics --directed <true | false> --input <csv | rmat>")
 			.appendNewLine()
 			.appendln("options:")
 			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
@@ -98,7 +98,7 @@ public class GraphMetrics {
 					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
 
 				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
+					.fromCsvReader(parameters.getRequired("input_filename"), env)
 						.ignoreCommentsEdges("#")
 						.lineDelimiterEdges(lineDelimiter)
 						.fieldDelimiterEdges(fieldDelimiter);
@@ -225,14 +225,17 @@ public class GraphMetrics {
 
 		env.execute("Graph Metrics");
 
+		System.out.println();
 		System.out.print("Vertex metrics:\n  ");
 		System.out.println(vm.getResult().toString().replace(";", "\n "));
-		System.out.print("\nEdge metrics:\n  ");
+		System.out.println();
+		System.out.print("Edge metrics:\n  ");
 		System.out.println(em.getResult().toString().replace(";", "\n "));
 
 		JobExecutionResult result = env.getLastJobExecutionResult();
 
 		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println("\nExecution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+		System.out.println();
+		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
index b035bd7..453b543 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -69,7 +69,7 @@ public class HITS {
 				" scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" +
 				" and good \"authorities\" are linked from good \"hubs\".", 80))
 			.appendNewLine()
-			.appendln("usage: HITS --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+			.appendln("usage: HITS --input <csv | rmat> --output <print | hash | csv>")
 			.appendNewLine()
 			.appendln("options:")
 			.appendln("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
@@ -104,7 +104,7 @@ public class HITS {
 					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
 
 				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
+					.fromCsvReader(parameters.getRequired("input_filename"), env)
 						.ignoreCommentsEdges("#")
 						.lineDelimiterEdges(lineDelimiter)
 						.fieldDelimiterEdges(fieldDelimiter);
@@ -157,17 +157,19 @@ public class HITS {
 
 		switch (parameters.get("output", "")) {
 			case "print":
+				System.out.println();
 				for (Object e: hits.collect()) {
 					System.out.println(((Result)e).toVerboseString());
 				}
 				break;
 
 			case "hash":
+				System.out.println();
 				System.out.println(DataSetUtils.checksumHashCode(hits));
 				break;
 
 			case "csv":
-				String filename = parameters.get("output_filename");
+				String filename = parameters.getRequired("output_filename");
 
 				String lineDelimiter = StringEscapeUtils.unescapeJava(
 					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -177,7 +179,7 @@ public class HITS {
 
 				hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
 
-				env.execute();
+				env.execute("HITS");
 				break;
 			default:
 				throw new ProgramParametrizationException(getUsage("invalid output type"));
@@ -186,6 +188,7 @@ public class HITS {
 		JobExecutionResult result = env.getLastJobExecutionResult();
 
 		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println();
 		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index cb11af9..abb675a 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -75,7 +75,7 @@ public class JaccardIndex {
 			.appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
 				" number of shared neighbors, and the number of distinct neighbors.", 80))
 			.appendNewLine()
-			.appendln("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+			.appendln("usage: JaccardIndex --input <csv | rmat> --output <print | hash | csv>")
 			.appendNewLine()
 			.appendln("options:")
 			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
@@ -110,7 +110,7 @@ public class JaccardIndex {
 					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
 
 				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
+					.fromCsvReader(parameters.getRequired("input_filename"), env)
 						.ignoreCommentsEdges("#")
 						.lineDelimiterEdges(lineDelimiter)
 						.fieldDelimiterEdges(fieldDelimiter);
@@ -137,7 +137,7 @@ public class JaccardIndex {
 
 						if (parameters.getBoolean("simplify", false)) {
 							graph = graph
-								.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()
+								.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)
 									.setParallelism(little_parallelism));
 						}
 
@@ -189,6 +189,7 @@ public class JaccardIndex {
 
 		switch (parameters.get("output", "")) {
 			case "print":
+				System.out.println();
 				for (Object e: ji.collect()) {
 					Result result = (Result)e;
 					System.out.println(result.toVerboseString());
@@ -196,11 +197,12 @@ public class JaccardIndex {
 				break;
 
 			case "hash":
+				System.out.println();
 				System.out.println(DataSetUtils.checksumHashCode(ji));
 				break;
 
 			case "csv":
-				String filename = parameters.get("output_filename");
+				String filename = parameters.getRequired("output_filename");
 
 				String lineDelimiter = StringEscapeUtils.unescapeJava(
 					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -220,6 +222,7 @@ public class JaccardIndex {
 		JobExecutionResult result = env.getLastJobExecutionResult();
 
 		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println();
 		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index 818b0d8..1fecc3d 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -116,7 +116,7 @@ public class TriangleListing {
 					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
 
 				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
+					.fromCsvReader(parameters.getRequired("input_filename"), env)
 						.ignoreCommentsEdges("#")
 						.lineDelimiterEdges(lineDelimiter)
 						.fieldDelimiterEdges(fieldDelimiter);
@@ -284,6 +284,7 @@ public class TriangleListing {
 
 		switch (parameters.get("output", "")) {
 			case "print":
+				System.out.println();
 				if (directedAlgorithm) {
 					for (Object e: tl.collect()) {
 						org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result =
@@ -296,11 +297,12 @@ public class TriangleListing {
 				break;
 
 			case "hash":
+				System.out.println();
 				System.out.println(DataSetUtils.checksumHashCode(tl));
 				break;
 
 			case "csv":
-				String filename = parameters.get("output_filename");
+				String filename = parameters.getRequired("output_filename");
 
 				String lineDelimiter = StringEscapeUtils.unescapeJava(
 					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -324,6 +326,7 @@ public class TriangleListing {
 		JobExecutionResult result = env.getLastJobExecutionResult();
 
 		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println();
 		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
index b13e82e..4d3d055 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
@@ -38,14 +38,12 @@ implements GraphAnalytic<K, VV, EV, T> {
 	public GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input)
 			throws Exception {
 		env = input.getContext();
-		return null;
+		return this;
 	}
 
 	@Override
 	public T execute()
 			throws Exception {
-		Preconditions.checkNotNull(env);
-
 		env.execute();
 		return getResult();
 	}
@@ -54,7 +52,6 @@ implements GraphAnalytic<K, VV, EV, T> {
 	public T execute(String jobName)
 			throws Exception {
 		Preconditions.checkNotNull(jobName);
-		Preconditions.checkNotNull(env);
 
 		env.execute(jobName);
 		return getResult();

http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
index b07a8c3..dbe3e0c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.graph;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -74,6 +76,10 @@ extends RichOutputFormat<T> {
 	 * @return The value of the accumulator with the given name
 	 */
 	public <A> A getAccumulator(ExecutionEnvironment env, String accumulatorName) {
-		return env.getLastJobExecutionResult().getAccumulatorResult(id + SEPARATOR + accumulatorName);
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		Preconditions.checkNotNull(result, "No result found for job, was execute() called before getting the result?");
+
+		return result.getAccumulatorResult(id + SEPARATOR + accumulatorName);
 	}
 }


[2/6] flink git commit: [hotfix] [build] Include flink-gelly-examples in opt/

Posted by se...@apache.org.
[hotfix] [build] Include flink-gelly-examples in opt/

Adds the flink-gelly-examples jar to the opt/ directory of the binary
release artifacts. The examples jar is referenced in the online documentation.

Corrects the file paths in the Gelly quickstart documentation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37bffdd2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37bffdd2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37bffdd2

Branch: refs/heads/release-1.2
Commit: 37bffdd2de3e5e14d3d3eda1371483609058106b
Parents: 7d3bfa2
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jan 20 13:37:10 2017 -0500
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100

----------------------------------------------------------------------
 docs/dev/libs/gelly/index.md           | 19 ++++++++++---------
 flink-dist/src/main/assemblies/opt.xml |  7 +++++++
 2 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/37bffdd2/docs/dev/libs/gelly/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/index.md b/docs/dev/libs/gelly/index.md
index d82fec0..8f8c6de 100644
--- a/docs/dev/libs/gelly/index.md
+++ b/docs/dev/libs/gelly/index.md
@@ -63,7 +63,8 @@ Add the following dependency to your `pom.xml` to use Gelly.
 </div>
 </div>
 
-Note that Gelly is currently not part of the binary distribution. See linking with it for cluster execution [here]({{ site.baseurl }}/dev/linking.html).
+Note that Gelly is not part of the binary distribution. See [linking]({{ site.baseurl }}/dev/linking.html) for
+instructions on packaging Gelly libraries into Flink user programs.
 
 The remaining sections provide a description of available methods and present several examples of how to use Gelly and how to mix it with the Flink DataSet API.
 
@@ -71,15 +72,15 @@ Running Gelly Examples
 ----------------------
 
 The Gelly library and examples jars are provided in the [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads")
-in the folder **opt/lib/gelly** (for versions older than Flink 1.2 these can be manually downloaded from
+in the folder **opt** (for versions older than Flink 1.2 these can be manually downloaded from
 [Maven Central](http://search.maven.org/#search|ga|1|flink%20gelly).
 
 To run the Gelly examples the **flink-gelly** (for Java) or **flink-gelly-scala** (for Scala) jar must be copied to
 Flink's **lib** directory.
 
 ~~~bash
-cp opt/lib/gelly/flink-gelly_*.jar lib/
-cp opt/lib/gelly/flink-gelly-scala_*.jar lib/
+cp opt/flink-gelly_*.jar lib/
+cp opt/flink-gelly-scala_*.jar lib/
 ~~~
 
 Gelly's examples jar includes both drivers for the library methods as well as additional example algorithms. After
@@ -87,7 +88,7 @@ configuring and starting the cluster, list the available algorithm classes:
 
 ~~~bash
 ./bin/start-cluster.sh
-./bin/flink run opt/lib/gelly/flink-gelly-examples_*.jar
+./bin/flink run opt/flink-gelly-examples_*.jar
 ~~~
 
 The Gelly drivers can generate [RMat](http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf) graph data or read the
@@ -95,7 +96,7 @@ edge list from a CSV file. Each node in a cluster must have access to the input
 directed generated graph:
 
 ~~~bash
-./bin/flink run -c org.apache.flink.graph.drivers.GraphMetrics opt/lib/gelly/flink-gelly-examples_*.jar \
+./bin/flink run -c org.apache.flink.graph.drivers.GraphMetrics opt/flink-gelly-examples_*.jar \
     --directed true --input rmat
 ~~~
 
@@ -110,14 +111,14 @@ Run a few algorithms and monitor the job progress in Flink's Web UI:
 ~~~bash
 wget -O - http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.gz | gunzip -c > com-lj.ungraph.txt
 
-./bin/flink run -q -c org.apache.flink.graph.drivers.GraphMetrics opt/lib/gelly/flink-gelly-examples_*.jar \
+./bin/flink run -q -c org.apache.flink.graph.drivers.GraphMetrics opt/flink-gelly-examples_*.jar \
     --directed true --input csv --type integer --input_filename com-lj.ungraph.txt --input_field_delimiter '\t'
 
-./bin/flink run -q -c org.apache.flink.graph.drivers.ClusteringCoefficient opt/lib/gelly/flink-gelly-examples_*.jar \
+./bin/flink run -q -c org.apache.flink.graph.drivers.ClusteringCoefficient opt/flink-gelly-examples_*.jar \
     --directed true --input csv --type integer --input_filename com-lj.ungraph.txt  --input_field_delimiter '\t' \
     --output hash
 
-./bin/flink run -q -c org.apache.flink.graph.drivers.JaccardIndex opt/lib/gelly/flink-gelly-examples_*.jar \
+./bin/flink run -q -c org.apache.flink.graph.drivers.JaccardIndex opt/flink-gelly-examples_*.jar \
     --input csv --type integer --simplify true --input_filename com-lj.ungraph.txt --input_field_delimiter '\t' \
     --output hash
 ~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/37bffdd2/flink-dist/src/main/assemblies/opt.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index 3622ece..c6dc307 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -53,6 +53,13 @@
 		</file>
 
 		<file>
+			<source>../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-${project.version}.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-gelly-examples_2.10-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
+		<file>
 			<source>../flink-libraries/flink-gelly-scala/target/flink-gelly-scala_2.10-${project.version}-jar-with-dependencies.jar</source>
 			<outputDirectory>opt/</outputDirectory>
 			<destName>flink-gelly-scala_2.10-${project.version}.jar</destName>