You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/04/11 14:20:17 UTC

[1/2] flink git commit: [FLINK-3634] [docs] Fix documentation for DataSetUtils.zipWithUniqueId()

Repository: flink
Updated Branches:
  refs/heads/master ed1e52a10 -> e16ca8460


[FLINK-3634] [docs] Fix documentation for DataSetUtils.zipWithUniqueId()

This closes #1817.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76f95f76
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76f95f76
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76f95f76

Branch: refs/heads/master
Commit: 76f95f76af0368c18d9a69136f5293d6d0da843e
Parents: ed1e52a
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Mar 18 10:44:03 2016 -0400
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Apr 11 14:19:07 2016 +0200

----------------------------------------------------------------------
 docs/apis/batch/zip_elements_guide.md | 30 ++++++++++++++++--------------
 1 file changed, 16 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/76f95f76/docs/apis/batch/zip_elements_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/zip_elements_guide.md b/docs/apis/batch/zip_elements_guide.md
index 8048f1c..59f723a 100644
--- a/docs/apis/batch/zip_elements_guide.md
+++ b/docs/apis/batch/zip_elements_guide.md
@@ -32,15 +32,17 @@ This document shows how {% gh_link /flink-java/src/main/java/org/apache/flink/ap
 {:toc}
 
 ### Zip with a Dense Index
-For assigning consecutive labels to the elements, the `zipWithIndex` method should be called. It receives a data set as input and returns a new data set of unique id, initial value tuples.
+`zipWithIndex` assigns consecutive labels to the elements, receiving a data set as input and returning a new data set of `(unique id, initial value)` 2-tuples.
+This process requires two passes, first counting then labeling elements, and cannot be pipelined due to the synchronization of counts.
+The alternative `zipWIthUniqueId` works in a pipelined fashion and is preferred when a unique labeling is sufficient.
 For example, the following code:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setParallelism(1);
-DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F");
+env.setParallelism(2);
+DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
 
 DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithIndex(in);
 
@@ -54,8 +56,8 @@ env.execute();
 import org.apache.flink.api.scala._
 
 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-env.setParallelism(1)
-val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F")
+env.setParallelism(2)
+val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
 
 val result: DataSet[(Long, String)] = input.zipWithIndex
 
@@ -66,21 +68,21 @@ env.execute()
 
 </div>
 
-will yield the tuples: (0,A), (1,B), (2,C), (3,D), (4,E), (5,F)
+may yield the tuples: (0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F)
 
 [Back to top](#top)
 
-### Zip with an Unique Identifier
-In many cases, one may not need to assign consecutive labels.
-`zipWIthUniqueId` works in a pipelined fashion, speeding up the label assignment process. This method receives a data set as input and returns a new data set of unique id, initial value tuples.
+### Zip with a Unique Identifier
+In many cases one may not need to assign consecutive labels.
+`zipWIthUniqueId` works in a pipelined fashion, speeding up the label assignment process. This method receives a data set as input and returns a new data set of `(unique id, initial value)` 2-tuples.
 For example, the following code:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setParallelism(1);
-DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F");
+env.setParallelism(2);
+DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");
 
 DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithUniqueId(in);
 
@@ -94,8 +96,8 @@ env.execute();
 import org.apache.flink.api.scala._
 
 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-env.setParallelism(1)
-val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F")
+env.setParallelism(2)
+val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
 
 val result: DataSet[(Long, String)] = input.zipWithUniqueId
 
@@ -106,6 +108,6 @@ env.execute()
 
 </div>
 
-will yield the tuples: (0,A), (2,B), (4,C), (6,D), (8,E), (10,F)
+may yield the tuples: (0,G), (1,A), (2,H), (3,B), (5,C), (7,D), (9,E), (11,F)
 
 [Back to top](#top)


[2/2] flink git commit: [FLINK-3469] [docs] Improve documentation for grouping keys

Posted by uc...@apache.org.
[FLINK-3469] [docs] Improve documentation for grouping keys

This closes #1858.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e16ca846
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e16ca846
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e16ca846

Branch: refs/heads/master
Commit: e16ca8460933eadc416aa0f3ade41ef6feb8ae5d
Parents: 76f95f7
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Apr 6 15:44:47 2016 -0400
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Apr 11 14:19:36 2016 +0200

----------------------------------------------------------------------
 docs/apis/batch/dataset_transformations.md | 120 +++++++++++++++++++++---
 1 file changed, 105 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e16ca846/docs/apis/batch/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/dataset_transformations.md b/docs/apis/batch/dataset_transformations.md
index 31a1dfa..be9691c 100644
--- a/docs/apis/batch/dataset_transformations.md
+++ b/docs/apis/batch/dataset_transformations.md
@@ -275,6 +275,69 @@ element using a user-defined reduce function.
 For each group of input elements, a reduce function successively combines pairs of elements into one
 element until only a single element for each group remains.
 
+#### Reduce on DataSet Grouped by Key Expression
+
+Key expressions specify one or more fields of each element of a DataSet. Each key expression is
+either the name of a public field or a getter method. A dot can be used to drill down into objects.
+The key expression "*" selects all fields.
+The following code shows how to group a POJO DataSet using key expressions and to reduce it
+with a reduce function.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+// some ordinary POJO
+public class WC {
+  public String word;
+  public int count;
+  // [...]
+}
+
+// ReduceFunction that sums Integer attributes of a POJO
+public class WordCounter implements ReduceFunction<WC> {
+  @Override
+  public WC reduce(WC in1, WC in2) {
+    return new WC(in1.word, in1.count + in2.count);
+  }
+}
+
+// [...]
+DataSet<WC> words = // [...]
+DataSet<WC> wordCounts = words
+                         // DataSet grouping on field "word"
+                         .groupBy("word")
+                         // apply ReduceFunction on grouped DataSet
+                         .reduce(new WordCounter());
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+// some ordinary POJO
+class WC(val word: String, val count: Int) {
+  def this() {
+    this(null, -1)
+  }
+  // [...]
+}
+
+val words: DataSet[WC] = // [...]
+val wordCounts = words.groupBy("word").reduce {
+  (w1, w2) => new WC(w1.word, w1.count + w2.count)
+}
+~~~
+
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+</div>
+</div>
+
 #### Reduce on DataSet Grouped by KeySelector Function
 
 A key-selector function extracts a key value from each element of a DataSet. The extracted key
@@ -305,9 +368,16 @@ public class WordCounter implements ReduceFunction<WC> {
 DataSet<WC> words = // [...]
 DataSet<WC> wordCounts = words
                          // DataSet grouping on field "word"
-                         .groupBy("word")
+                         .groupBy(new SelectWord())
                          // apply ReduceFunction on grouped DataSet
                          .reduce(new WordCounter());
+
+public class SelectWord implements KeySelector<WC, String> {
+  @Override
+  public String getKey(Word w) {
+    return w.word;
+  }
+}
 ~~~
 
 </div>
@@ -332,7 +402,14 @@ val wordCounts = words.groupBy { _.word } reduce {
 <div data-lang="python" markdown="1">
 
 ~~~python
-Not supported.
+class WordCounter(ReduceFunction):
+    def reduce(self, in1, in2):
+        return (in1[0], in1[1] + in2[1])
+
+words = // [...]
+wordCounts = words \
+    .group_by(lambda x: x[0]) \
+    .reduce(WordCounter())
 ~~~
 </div>
 </div>
@@ -347,10 +424,9 @@ The following code shows how to use field position keys and apply a reduce funct
 
 ~~~java
 DataSet<Tuple3<String, Integer, Double>> tuples = // [...]
-DataSet<Tuple3<String, Integer, Double>> reducedTuples =
-                                         tuples
+DataSet<Tuple3<String, Integer, Double>> reducedTuples = tuples
                                          // group DataSet on first and second field of Tuple
-                                         .groupBy(0,1)
+                                         .groupBy(0, 1)
                                          // apply ReduceFunction on grouped DataSet
                                          .reduce(new MyTupleReducer());
 ~~~
@@ -364,11 +440,29 @@ val tuples = DataSet[(String, Int, Double)] = // [...]
 val reducedTuples = tuples.groupBy(0, 1).reduce { ... }
 ~~~
 
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ reducedTuples = tuples.group_by(0, 1).reduce( ... )
+~~~
+
+</div>
+</div>
 
 #### Reduce on DataSet grouped by Case Class Fields
 
 When using Case Classes you can also specify the grouping key using the names of the fields:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+Not supported.
+~~~
+</div>
+<div data-lang="scala" markdown="1">
+
 ~~~scala
 case class MyClass(val a: String, b: Int, c: Double)
 val tuples = DataSet[MyClass] = // [...]
@@ -380,9 +474,8 @@ val reducedTuples = tuples.groupBy("a", "b").reduce { ... }
 <div data-lang="python" markdown="1">
 
 ~~~python
- reducedTuples = tuples.group_by(0, 1).reduce( ... )
+Not supported.
 ~~~
-
 </div>
 </div>
 
@@ -442,11 +535,6 @@ val output = input.groupBy(0).reduceGroup {
     }
 ~~~
 
-#### GroupReduce on DataSet Grouped by Case Class Fields
-
-Works analogous to grouping by Case Class fields in *Reduce* transformations.
-
-
 </div>
 <div data-lang="python" markdown="1">
 
@@ -462,13 +550,15 @@ Works analogous to grouping by Case Class fields in *Reduce* transformations.
  output = data.group_by(0).reduce_group(DistinctReduce())
 ~~~
 
-
 </div>
 </div>
 
-#### GroupReduce on DataSet Grouped by KeySelector Function
+#### GroupReduce on DataSet Grouped by Key Expression, KeySelector Function, or Case Class Fields
+
+Work analogous to [key expressions](#reduce-on-dataset-grouped-by-key-expression),
+[key-selector functions](#reduce-on-dataset-grouped-by-keyselector-function),
+and [case class fields](#reduce-on-dataset-grouped-by-case-class-fields) in *Reduce* transformations.
 
-Works analogous to key-selector functions in *Reduce* transformations.
 
 #### GroupReduce on sorted groups