You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/23 12:58:38 UTC
[1/6] flink git commit: [FLINK-5118] [metrics] Fix inconsistent
numBytesIn/Out metrics for network channels
Repository: flink
Updated Branches:
refs/heads/release-1.2 1d6b8bbce -> 940b4900d
[FLINK-5118] [metrics] Fix inconsistent numBytesIn/Out metrics for network channels
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/940b4900
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/940b4900
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/940b4900
Branch: refs/heads/release-1.2
Commit: 940b4900d1cfc19b32af2b3e8753d3b6e01622ee
Parents: 3c943e6
Author: zentol <ch...@apache.org>
Authored: Thu Jan 12 14:59:22 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100
----------------------------------------------------------------------
.../api/serialization/SpanningRecordSerializer.java | 8 --------
.../runtime/io/network/api/writer/RecordWriter.java | 11 ++++++++---
2 files changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/940b4900/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index a8fe3fe..57ed06f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -24,7 +24,6 @@ import java.nio.ByteOrder;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.DataOutputSerializer;
@@ -52,8 +51,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
/** Limit of current {@link MemorySegment} of target buffer */
private int limit;
- private transient Counter numBytesOut;
-
public SpanningRecordSerializer() {
this.serializationBuffer = new DataOutputSerializer(128);
@@ -81,10 +78,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
int len = this.serializationBuffer.length();
this.lengthBuffer.putInt(0, len);
-
- if (numBytesOut != null) {
- numBytesOut.inc(len);
- }
this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
@@ -187,6 +180,5 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
@Override
public void instantiateMetrics(TaskIOMetricGroup metrics) {
- numBytesOut = metrics.getNumBytesOutCounter();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/940b4900/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 799187d..f9671ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.io.network.api.writer;
import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
@@ -57,6 +59,8 @@ public class RecordWriter<T extends IOReadableWritable> {
private final Random RNG = new XORShiftRandom();
+ private Counter numBytesOut = new SimpleCounter();
+
public RecordWriter(ResultPartitionWriter writer) {
this(writer, new RoundRobinChannelSelector<T>());
}
@@ -112,6 +116,7 @@ public class RecordWriter<T extends IOReadableWritable> {
Buffer buffer = serializer.getCurrentBuffer();
if (buffer != null) {
+ numBytesOut.inc(buffer.getSize());
writeAndClearBuffer(buffer, targetChannel, serializer);
// If this was a full record, we are done. Not breaking
@@ -137,6 +142,7 @@ public class RecordWriter<T extends IOReadableWritable> {
synchronized (serializer) {
Buffer buffer = serializer.getCurrentBuffer();
if (buffer != null) {
+ numBytesOut.inc(buffer.getSize());
writeAndClearBuffer(buffer, targetChannel, serializer);
} else if (serializer.hasData()) {
throw new IllegalStateException("No buffer, but serializer has buffered data.");
@@ -171,6 +177,7 @@ public class RecordWriter<T extends IOReadableWritable> {
Buffer buffer = serializer.getCurrentBuffer();
if (buffer != null) {
+ numBytesOut.inc(buffer.getSize());
writeAndClearBuffer(buffer, targetChannel, serializer);
}
} finally {
@@ -202,9 +209,7 @@ public class RecordWriter<T extends IOReadableWritable> {
* @param metrics
*/
public void setMetricGroup(TaskIOMetricGroup metrics) {
- for(RecordSerializer<?> serializer : serializers) {
- serializer.instantiateMetrics(metrics);
- }
+ numBytesOut = metrics.getNumBytesOutCounter();
}
/**
[6/6] flink git commit: [hotfix] [gelly] Fix TriangleListing EdgeOrder
Posted by se...@apache.org.
[hotfix] [gelly] Fix TriangleListing EdgeOrder
The edge bitmask was swapped in ed09dba.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d3bfa2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d3bfa2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d3bfa2f
Branch: refs/heads/release-1.2
Commit: 7d3bfa2fdfd9e3c30ce57309c12d5c2cff2a0f1c
Parents: feeb9d7
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jan 20 13:33:01 2017 -0500
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100
----------------------------------------------------------------------
.../clustering/directed/TriangleListing.java | 24 +++++++++-----------
1 file changed, 11 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7d3bfa2f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index e1b3040..a1006c4 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -443,19 +443,17 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
}
private String maskToString(byte mask, int shift) {
- switch((mask >>> shift) & 0b000011) {
- case 0b01:
- // EdgeOrder.FORWARD
- return "->";
- case 0b10:
- // EdgeOrder.REVERSE
- return "<-";
- case 0b11:
- // EdgeOrder.MUTUAL
- return "<->";
- default:
- throw new IllegalArgumentException("Bitmask is missing an edge (mask = "
- + mask + ", shift = " + shift);
+ int edgeMask = (mask >>> shift) & 0b000011;
+
+ if (edgeMask == EdgeOrder.FORWARD.getBitmask()) {
+ return "->";
+ } else if (edgeMask == EdgeOrder.REVERSE.getBitmask()) {
+ return "<-";
+ } else if (edgeMask == EdgeOrder.MUTUAL.getBitmask()) {
+ return "<->";
+ } else {
+ throw new IllegalArgumentException("Bitmask is missing an edge (mask = "
+ + mask + ", shift = " + shift);
}
}
}
[3/6] flink git commit: [FLINK-5532] [streaming api] Make marker
window assigners for aligned window ops non-extendable
Posted by se...@apache.org.
[FLINK-5532] [streaming api] Make marker window assigners for aligned window ops non-extendable
Makes the TumblingAlignedProcessingTimeWindows and the
SlidingAlignedProcessingTimeWindows final so that users cannot
extend them.
This closes #3180
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c291fe6c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c291fe6c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c291fe6c
Branch: refs/heads/release-1.2
Commit: c291fe6c5e90ca7c6f11d2c93a4a3d370d5f8a8c
Parents: 1d6b8bb
Author: kl0u <kk...@gmail.com>
Authored: Fri Jan 20 14:31:48 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100
----------------------------------------------------------------------
.../apache/flink/streaming/api/datastream/WindowedStream.java | 4 ++--
.../windowing/assigners/SlidingAlignedProcessingTimeWindows.java | 2 +-
.../assigners/TumblingAlignedProcessingTimeWindows.java | 4 ++--
3 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c291fe6c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 686e874..cd8385b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -1015,7 +1015,7 @@ public class WindowedStream<T, K, W extends Window> {
TypeInformation<R> resultType,
String functionName) {
- if (windowAssigner instanceof SlidingAlignedProcessingTimeWindows && trigger == null && evictor == null) {
+ if (windowAssigner.getClass().equals(SlidingAlignedProcessingTimeWindows.class) && trigger == null && evictor == null) {
SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner;
final long windowLength = timeWindows.getSize();
final long windowSlide = timeWindows.getSlide();
@@ -1046,7 +1046,7 @@ public class WindowedStream<T, K, W extends Window> {
windowLength, windowSlide);
return input.transform(opName, resultType, op);
}
- } else if (windowAssigner instanceof TumblingAlignedProcessingTimeWindows && trigger == null && evictor == null) {
+ } else if (windowAssigner.getClass().equals(TumblingAlignedProcessingTimeWindows.class) && trigger == null && evictor == null) {
TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
final long windowLength = timeWindows.getSize();
final long windowSlide = timeWindows.getSize();
http://git-wip-us.apache.org/repos/asf/flink/blob/c291fe6c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
index 743ee0b..a0e0bcb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
@@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
* <p>
* <b>WARNING:</b> Bear in mind that no rescaling and no backwards compatibility is supported.
* */
-public class SlidingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
+public final class SlidingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
private static final long serialVersionUID = 3695562702662473688L;
http://git-wip-us.apache.org/repos/asf/flink/blob/c291fe6c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
index 007fae9..e1a8101 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
@@ -35,11 +35,11 @@ import org.apache.flink.streaming.api.windowing.time.Time;
* <p>
* <b>WARNING:</b> Bear in mind that no rescaling and no backwards compatibility is supported.
* */
-public class TumblingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
+public final class TumblingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
private static final long serialVersionUID = -6217477609512299842L;
- protected TumblingAlignedProcessingTimeWindows(long size) {
+ public TumblingAlignedProcessingTimeWindows(long size) {
super(size);
}
[4/6] flink git commit: [FLINK-5532] [streaming api] Improve JavaDocs
for assigners for Fast Aligned Windows to clarify role
Posted by se...@apache.org.
[FLINK-5532] [streaming api] Improve JavaDocs for assigners for Fast Aligned Windows to clarify role
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/feeb9d7b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/feeb9d7b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/feeb9d7b
Branch: refs/heads/release-1.2
Commit: feeb9d7bb7afb56749be501afb79f14617c455d7
Parents: c291fe6
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 23 11:24:01 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100
----------------------------------------------------------------------
.../api/datastream/WindowedStream.java | 4 +-
.../SlidingAlignedProcessingTimeWindows.java | 41 +++++++++++++-------
.../TumblingAlignedProcessingTimeWindows.java | 37 ++++++++++++------
3 files changed, 54 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/feeb9d7b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index cd8385b..51712e1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -1015,7 +1015,7 @@ public class WindowedStream<T, K, W extends Window> {
TypeInformation<R> resultType,
String functionName) {
- if (windowAssigner.getClass().equals(SlidingAlignedProcessingTimeWindows.class) && trigger == null && evictor == null) {
+ if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner;
final long windowLength = timeWindows.getSize();
final long windowSlide = timeWindows.getSlide();
@@ -1046,7 +1046,7 @@ public class WindowedStream<T, K, W extends Window> {
windowLength, windowSlide);
return input.transform(opName, resultType, op);
}
- } else if (windowAssigner.getClass().equals(TumblingAlignedProcessingTimeWindows.class) && trigger == null && evictor == null) {
+ } else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
final long windowLength = timeWindows.getSize();
final long windowSlide = timeWindows.getSize();
http://git-wip-us.apache.org/repos/asf/flink/blob/feeb9d7b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
index a0e0bcb..984c31a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
@@ -18,23 +18,36 @@
package org.apache.flink.streaming.api.windowing.assigners;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
- * A processing time sliding {@link WindowAssigner window assigner} used to perform windowing using the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator
- * AccumulatingProcessingTimeWindowOperator} and the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator
- * AggregatingProcessingTimeWindowOperator}.
- *
- * <p>
- * With this assigner, the {@code trigger} used is a
- * {@link org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger
- * ProcessingTimeTrigger} and no {@code evictor} can be specified.
- *
- * <p>
- * <b>WARNING:</b> Bear in mind that no rescaling and no backwards compatibility is supported.
- * */
+ * This is a special window assigner used to tell the system to use the
+ * <i>"Fast Aligned Processing Time Window Operator"</i> for windowing.
+ *
+ * <p>Prior Flink versions used that operator automatically for simple processing time
+ * windows (tumbling and sliding) when no custom trigger and no evictor was specified.
+ * In the current Flink version, that operator is only used when programs explicitly
+ * specify this window assigner. This is only intended for special cases where programs relied on
+ * the better performance of the fast aligned window operator, and are willing to accept the lack
+ * of support for various features as indicated below:
+ *
+ * <ul>
+ * <li>No custom state backend can be selected, the operator always stores data on the Java heap.</li>
+ * <li>The operator does not support key groups, meaning it cannot change the parallelism.</li>
+ * <li>Future versions of Flink may not be able to resume from checkpoints/savepoints taken by this
+ * operator.</li>
+ * </ul>
+ *
+ * <p>Future implementation plans: We plan to add some of the optimizations used by this operator to
+ * the general window operator, so that future versions of Flink will not have the performance/functionality
+ * trade-off any more.
+ *
+ * <p>Note on implementation: The concrete operator instantiated by this assigner is either the
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator}
+ * or {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator}.
+ */
+@PublicEvolving
public final class SlidingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
private static final long serialVersionUID = 3695562702662473688L;
http://git-wip-us.apache.org/repos/asf/flink/blob/feeb9d7b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
index e1a8101..c00eb7d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
@@ -18,23 +18,36 @@
package org.apache.flink.streaming.api.windowing.assigners;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
- * A processing time tumbling {@link WindowAssigner window assigner} used to perform windowing using the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator
- * AccumulatingProcessingTimeWindowOperator} and the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator
- * AggregatingProcessingTimeWindowOperator}.
+ * This is a special window assigner used to tell the system to use the
+ * <i>"Fast Aligned Processing Time Window Operator"</i> for windowing.
*
- * <p>
- * With this assigner, the {@code trigger} used is a
- * {@link org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger
- * ProcessingTimeTrigger} and no {@code evictor} can be specified.
+ * <p>Prior Flink versions used that operator automatically for simple processing time
+ * windows (tumbling and sliding) when no custom trigger and no evictor was specified.
+ * In the current Flink version, that operator is only used when programs explicitly
+ * specify this window assigner. This is only intended for special cases where programs relied on
+ * the better performance of the fast aligned window operator, and are willing to accept the lack
+ * of support for various features as indicated below:
*
- * <p>
- * <b>WARNING:</b> Bear in mind that no rescaling and no backwards compatibility is supported.
- * */
+ * <ul>
+ * <li>No custom state backend can be selected, the operator always stores data on the Java heap.</li>
+ * <li>The operator does not support key groups, meaning it cannot change the parallelism.</li>
+ * <li>Future versions of Flink may not be able to resume from checkpoints/savepoints taken by this
+ * operator.</li>
+ * </ul>
+ *
+ * <p>Future implementation plans: We plan to add some of the optimizations used by this operator to
+ * the general window operator, so that future versions of Flink will not have the performance/functionality
+ * trade-off any more.
+ *
+ * <p>Note on implementation: The concrete operator instantiated by this assigner is either the
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator}
+ * or {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator}.
+ */
+@PublicEvolving
public final class TumblingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
private static final long serialVersionUID = -6217477609512299842L;
[5/6] flink git commit: [FLINK-5562] [gelly] Driver fixes: Improve
parametrization and output formatting.
Posted by se...@apache.org.
[FLINK-5562] [gelly] Driver fixes: Improve parametrization and output formatting.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c943e68
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c943e68
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c943e68
Branch: refs/heads/release-1.2
Commit: 3c943e6820525180089f3d402010e138fd9af54d
Parents: 37bffdd
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Oct 26 15:18:50 2016 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100
----------------------------------------------------------------------
.../graph/drivers/ClusteringCoefficient.java | 2 +-
.../apache/flink/graph/drivers/Graph500.java | 37 ++++++++++++++++----
.../flink/graph/drivers/GraphMetrics.java | 11 +++---
.../org/apache/flink/graph/drivers/HITS.java | 11 +++---
.../flink/graph/drivers/JaccardIndex.java | 11 +++---
.../flink/graph/drivers/TriangleListing.java | 7 ++--
.../flink/graph/AbstractGraphAnalytic.java | 5 +--
.../org/apache/flink/graph/AnalyticHelper.java | 8 ++++-
8 files changed, 65 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
index cd28ee4..79a17a4 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -78,7 +78,7 @@ public class ClusteringCoefficient {
.appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" +
" the vertex, and the number of edges between vertex neighbors.", 80))
.appendNewLine()
- .appendln("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+ .appendln("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat> --output <print | hash | csv>")
.appendNewLine()
.appendln("options:")
.appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
index 51ef66f..a4e7c01 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.client.program.ProgramParametrizationException;
import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
import org.apache.flink.graph.generator.random.RandomGenerableFactory;
@@ -53,8 +52,6 @@ public class Graph500 {
private static final int DEFAULT_EDGE_FACTOR = 16;
- private static final boolean DEFAULT_SIMPLIFY = false;
-
private static final boolean DEFAULT_CLIP_AND_FLIP = true;
private static String getUsage(String message) {
@@ -68,6 +65,9 @@ public class Graph500 {
.appendNewLine()
.appendln("Note: this does not yet implement permutation of vertex labels or edges.")
.appendNewLine()
+ .appendln("usage: Graph500 --directed <true | false> --simplify <true | false> --output <print | hash | csv>")
+ .appendNewLine()
+ .appendln("options:")
.appendln(" --output print")
.appendln(" --output hash")
.appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
@@ -84,6 +84,17 @@ public class Graph500 {
ParameterTool parameters = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(parameters);
+ if (! parameters.has("directed")) {
+ throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
+ }
+ boolean directed = parameters.getBoolean("directed");
+
+ if (! parameters.has("simplify")) {
+ throw new ProgramParametrizationException(getUsage("must declare '--simplify true' or '--simplify false'"));
+ }
+ boolean simplify = parameters.getBoolean("simplify");
+
+
// Generate RMat graph
int scale = parameters.getInt("scale", DEFAULT_SCALE);
int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
@@ -93,14 +104,23 @@ public class Graph500 {
long vertexCount = 1L << scale;
long edgeCount = vertexCount * edgeFactor;
- boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY);
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.generate();
- if (simplify) {
- graph = graph.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+ if (directed) {
+ if (simplify) {
+ graph = graph
+ .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+ }
+ } else {
+ if (simplify) {
+ graph = graph
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+ } else {
+ graph = graph.getUndirected();
+ }
}
DataSet<Tuple2<LongValue,LongValue>> edges = graph
@@ -110,15 +130,17 @@ public class Graph500 {
// Print, hash, or write RMat graph to disk
switch (parameters.get("output", "")) {
case "print":
+ System.out.println();
edges.print();
break;
case "hash":
+ System.out.println();
System.out.println(DataSetUtils.checksumHashCode(edges));
break;
case "csv":
- String filename = parameters.get("filename");
+ String filename = parameters.getRequired("output_filename");
String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -137,6 +159,7 @@ public class Graph500 {
JobExecutionResult result = env.getLastJobExecutionResult();
NumberFormat nf = NumberFormat.getInstance();
+ System.out.println();
System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
index 899ae66..9b246df 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -63,7 +63,7 @@ public class GraphMetrics {
.appendNewLine()
.appendln(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80))
.appendNewLine()
- .appendln("usage: GraphMetrics --directed <true | false> --input <csv | rmat [options]>")
+ .appendln("usage: GraphMetrics --directed <true | false> --input <csv | rmat>")
.appendNewLine()
.appendln("options:")
.appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
@@ -98,7 +98,7 @@ public class GraphMetrics {
parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
GraphCsvReader reader = Graph
- .fromCsvReader(parameters.get("input_filename"), env)
+ .fromCsvReader(parameters.getRequired("input_filename"), env)
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter);
@@ -225,14 +225,17 @@ public class GraphMetrics {
env.execute("Graph Metrics");
+ System.out.println();
System.out.print("Vertex metrics:\n ");
System.out.println(vm.getResult().toString().replace(";", "\n "));
- System.out.print("\nEdge metrics:\n ");
+ System.out.println();
+ System.out.print("Edge metrics:\n ");
System.out.println(em.getResult().toString().replace(";", "\n "));
JobExecutionResult result = env.getLastJobExecutionResult();
NumberFormat nf = NumberFormat.getInstance();
- System.out.println("\nExecution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+ System.out.println();
+ System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
index b035bd7..453b543 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -69,7 +69,7 @@ public class HITS {
" scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" +
" and good \"authorities\" are linked from good \"hubs\".", 80))
.appendNewLine()
- .appendln("usage: HITS --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+ .appendln("usage: HITS --input <csv | rmat> --output <print | hash | csv>")
.appendNewLine()
.appendln("options:")
.appendln(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
@@ -104,7 +104,7 @@ public class HITS {
parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
GraphCsvReader reader = Graph
- .fromCsvReader(parameters.get("input_filename"), env)
+ .fromCsvReader(parameters.getRequired("input_filename"), env)
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter);
@@ -157,17 +157,19 @@ public class HITS {
switch (parameters.get("output", "")) {
case "print":
+ System.out.println();
for (Object e: hits.collect()) {
System.out.println(((Result)e).toVerboseString());
}
break;
case "hash":
+ System.out.println();
System.out.println(DataSetUtils.checksumHashCode(hits));
break;
case "csv":
- String filename = parameters.get("output_filename");
+ String filename = parameters.getRequired("output_filename");
String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -177,7 +179,7 @@ public class HITS {
hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
- env.execute();
+ env.execute("HITS");
break;
default:
throw new ProgramParametrizationException(getUsage("invalid output type"));
@@ -186,6 +188,7 @@ public class HITS {
JobExecutionResult result = env.getLastJobExecutionResult();
NumberFormat nf = NumberFormat.getInstance();
+ System.out.println();
System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index cb11af9..abb675a 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -75,7 +75,7 @@ public class JaccardIndex {
.appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
" number of shared neighbors, and the number of distinct neighbors.", 80))
.appendNewLine()
- .appendln("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+ .appendln("usage: JaccardIndex --input <csv | rmat> --output <print | hash | csv>")
.appendNewLine()
.appendln("options:")
.appendln(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
@@ -110,7 +110,7 @@ public class JaccardIndex {
parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
GraphCsvReader reader = Graph
- .fromCsvReader(parameters.get("input_filename"), env)
+ .fromCsvReader(parameters.getRequired("input_filename"), env)
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter);
@@ -137,7 +137,7 @@ public class JaccardIndex {
if (parameters.getBoolean("simplify", false)) {
graph = graph
- .run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)
.setParallelism(little_parallelism));
}
@@ -189,6 +189,7 @@ public class JaccardIndex {
switch (parameters.get("output", "")) {
case "print":
+ System.out.println();
for (Object e: ji.collect()) {
Result result = (Result)e;
System.out.println(result.toVerboseString());
@@ -196,11 +197,12 @@ public class JaccardIndex {
break;
case "hash":
+ System.out.println();
System.out.println(DataSetUtils.checksumHashCode(ji));
break;
case "csv":
- String filename = parameters.get("output_filename");
+ String filename = parameters.getRequired("output_filename");
String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -220,6 +222,7 @@ public class JaccardIndex {
JobExecutionResult result = env.getLastJobExecutionResult();
NumberFormat nf = NumberFormat.getInstance();
+ System.out.println();
System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index 818b0d8..1fecc3d 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -116,7 +116,7 @@ public class TriangleListing {
parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
GraphCsvReader reader = Graph
- .fromCsvReader(parameters.get("input_filename"), env)
+ .fromCsvReader(parameters.getRequired("input_filename"), env)
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter);
@@ -284,6 +284,7 @@ public class TriangleListing {
switch (parameters.get("output", "")) {
case "print":
+ System.out.println();
if (directedAlgorithm) {
for (Object e: tl.collect()) {
org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result =
@@ -296,11 +297,12 @@ public class TriangleListing {
break;
case "hash":
+ System.out.println();
System.out.println(DataSetUtils.checksumHashCode(tl));
break;
case "csv":
- String filename = parameters.get("output_filename");
+ String filename = parameters.getRequired("output_filename");
String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
@@ -324,6 +326,7 @@ public class TriangleListing {
JobExecutionResult result = env.getLastJobExecutionResult();
NumberFormat nf = NumberFormat.getInstance();
+ System.out.println();
System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
index b13e82e..4d3d055 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
@@ -38,14 +38,12 @@ implements GraphAnalytic<K, VV, EV, T> {
public GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input)
throws Exception {
env = input.getContext();
- return null;
+ return this;
}
@Override
public T execute()
throws Exception {
- Preconditions.checkNotNull(env);
-
env.execute();
return getResult();
}
@@ -54,7 +52,6 @@ implements GraphAnalytic<K, VV, EV, T> {
public T execute(String jobName)
throws Exception {
Preconditions.checkNotNull(jobName);
- Preconditions.checkNotNull(env);
env.execute(jobName);
return getResult();
http://git-wip-us.apache.org/repos/asf/flink/blob/3c943e68/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
index b07a8c3..dbe3e0c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AnalyticHelper.java
@@ -18,11 +18,13 @@
package org.apache.flink.graph;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.Serializable;
@@ -74,6 +76,10 @@ extends RichOutputFormat<T> {
* @return The value of the accumulator with the given name
*/
public <A> A getAccumulator(ExecutionEnvironment env, String accumulatorName) {
- return env.getLastJobExecutionResult().getAccumulatorResult(id + SEPARATOR + accumulatorName);
+ JobExecutionResult result = env.getLastJobExecutionResult();
+
+ Preconditions.checkNotNull(result, "No result found for job, was execute() called before getting the result?");
+
+ return result.getAccumulatorResult(id + SEPARATOR + accumulatorName);
}
}
[2/6] flink git commit: [hotfix] [build] Include flink-gelly-examples
in opt/
Posted by se...@apache.org.
[hotfix] [build] Include flink-gelly-examples in opt/
Adds the flink-gelly-examples jar to the opt/ directory of the binary
release artifacts. The examples jar is referenced in the online documentation.
Corrects the file paths in the Gelly quickstart documentation.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37bffdd2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37bffdd2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37bffdd2
Branch: refs/heads/release-1.2
Commit: 37bffdd2de3e5e14d3d3eda1371483609058106b
Parents: 7d3bfa2
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jan 20 13:37:10 2017 -0500
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100
----------------------------------------------------------------------
docs/dev/libs/gelly/index.md | 19 ++++++++++---------
flink-dist/src/main/assemblies/opt.xml | 7 +++++++
2 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/37bffdd2/docs/dev/libs/gelly/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/index.md b/docs/dev/libs/gelly/index.md
index d82fec0..8f8c6de 100644
--- a/docs/dev/libs/gelly/index.md
+++ b/docs/dev/libs/gelly/index.md
@@ -63,7 +63,8 @@ Add the following dependency to your `pom.xml` to use Gelly.
</div>
</div>
-Note that Gelly is currently not part of the binary distribution. See linking with it for cluster execution [here]({{ site.baseurl }}/dev/linking.html).
+Note that Gelly is not part of the binary distribution. See [linking]({{ site.baseurl }}/dev/linking.html) for
+instructions on packaging Gelly libraries into Flink user programs.
The remaining sections provide a description of available methods and present several examples of how to use Gelly and how to mix it with the Flink DataSet API.
@@ -71,15 +72,15 @@ Running Gelly Examples
----------------------
The Gelly library and examples jars are provided in the [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads")
-in the folder **opt/lib/gelly** (for versions older than Flink 1.2 these can be manually downloaded from
+in the folder **opt** (for versions older than Flink 1.2 these can be manually downloaded from
[Maven Central](http://search.maven.org/#search|ga|1|flink%20gelly).
To run the Gelly examples the **flink-gelly** (for Java) or **flink-gelly-scala** (for Scala) jar must be copied to
Flink's **lib** directory.
~~~bash
-cp opt/lib/gelly/flink-gelly_*.jar lib/
-cp opt/lib/gelly/flink-gelly-scala_*.jar lib/
+cp opt/flink-gelly_*.jar lib/
+cp opt/flink-gelly-scala_*.jar lib/
~~~
Gelly's examples jar includes both drivers for the library methods as well as additional example algorithms. After
@@ -87,7 +88,7 @@ configuring and starting the cluster, list the available algorithm classes:
~~~bash
./bin/start-cluster.sh
-./bin/flink run opt/lib/gelly/flink-gelly-examples_*.jar
+./bin/flink run opt/flink-gelly-examples_*.jar
~~~
The Gelly drivers can generate [RMat](http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf) graph data or read the
@@ -95,7 +96,7 @@ edge list from a CSV file. Each node in a cluster must have access to the input
directed generated graph:
~~~bash
-./bin/flink run -c org.apache.flink.graph.drivers.GraphMetrics opt/lib/gelly/flink-gelly-examples_*.jar \
+./bin/flink run -c org.apache.flink.graph.drivers.GraphMetrics opt/flink-gelly-examples_*.jar \
--directed true --input rmat
~~~
@@ -110,14 +111,14 @@ Run a few algorithms and monitor the job progress in Flink's Web UI:
~~~bash
wget -O - http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.gz | gunzip -c > com-lj.ungraph.txt
-./bin/flink run -q -c org.apache.flink.graph.drivers.GraphMetrics opt/lib/gelly/flink-gelly-examples_*.jar \
+./bin/flink run -q -c org.apache.flink.graph.drivers.GraphMetrics opt/flink-gelly-examples_*.jar \
--directed true --input csv --type integer --input_filename com-lj.ungraph.txt --input_field_delimiter '\t'
-./bin/flink run -q -c org.apache.flink.graph.drivers.ClusteringCoefficient opt/lib/gelly/flink-gelly-examples_*.jar \
+./bin/flink run -q -c org.apache.flink.graph.drivers.ClusteringCoefficient opt/flink-gelly-examples_*.jar \
--directed true --input csv --type integer --input_filename com-lj.ungraph.txt --input_field_delimiter '\t' \
--output hash
-./bin/flink run -q -c org.apache.flink.graph.drivers.JaccardIndex opt/lib/gelly/flink-gelly-examples_*.jar \
+./bin/flink run -q -c org.apache.flink.graph.drivers.JaccardIndex opt/flink-gelly-examples_*.jar \
--input csv --type integer --simplify true --input_filename com-lj.ungraph.txt --input_field_delimiter '\t' \
--output hash
~~~
http://git-wip-us.apache.org/repos/asf/flink/blob/37bffdd2/flink-dist/src/main/assemblies/opt.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index 3622ece..c6dc307 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -53,6 +53,13 @@
</file>
<file>
+ <source>../flink-libraries/flink-gelly-examples/target/flink-gelly-examples_2.10-${project.version}.jar</source>
+ <outputDirectory>opt/</outputDirectory>
+ <destName>flink-gelly-examples_2.10-${project.version}.jar</destName>
+ <fileMode>0644</fileMode>
+ </file>
+
+ <file>
<source>../flink-libraries/flink-gelly-scala/target/flink-gelly-scala_2.10-${project.version}-jar-with-dependencies.jar</source>
<outputDirectory>opt/</outputDirectory>
<destName>flink-gelly-scala_2.10-${project.version}.jar</destName>