You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/04/22 16:47:37 UTC

flink git commit: [FLINK-1422] [docs] Add function parametrization docs

Repository: flink
Updated Branches:
  refs/heads/master f1ee90ccb -> 4e61775f7


[FLINK-1422] [docs] Add function parametrization docs

This closes #350.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e61775f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e61775f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e61775f

Branch: refs/heads/master
Commit: 4e61775f71da50cc69a66ac26ca45e23affde2a8
Parents: f1ee90c
Author: zentol <s....@web.de>
Authored: Wed Apr 22 16:14:16 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Apr 22 16:45:32 2015 +0200

----------------------------------------------------------------------
 docs/apis/programming_guide.md | 101 +++++++++++++++++++++++++++++++++++-
 1 file changed, 99 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e61775f/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index 1d6723f..7d21200 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -1320,8 +1320,9 @@ data.map (new RichMapFunction[String, Int] {
 
 Rich functions provide, in addition to the user-defined function (map,
 reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and
-`setRuntimeContext`. These are useful for creating and finalizing
-local state, accessing broadcast variables (see
+`setRuntimeContext`. These are useful for parameterizing the function
+(see [Passing Parameters to Functions](#passing-parameters-to-functions)),
+creating and finalizing local state, accessing broadcast variables (see
 [Broadcast Variables](#broadcast-variables), and for accessing runtime
 information such as accumulators and counters (see
 [Accumulators and Counters](#accumulators--counters), and information
@@ -2584,6 +2585,102 @@ of a function, or use the `withParameters(...)` method to pass in a configuratio
 
 [Back to top](#top)
 
+Passing Parameters to Functions
+-------------------
+
+Parameters can be passed to functions using either the constructor or the `withParameters(Configuration)` method. The parameters are serialized
+as part of the function object and shipped to all parallel task instances.
+
+#### Via Constructor
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataSet<Integer> toFilter = env.fromElements(1, 2, 3);
+
+toFilter.filter(new MyFilter(2));
+
+private static class MyFilter implements FilterFunction<Integer> {
+
+  private final int limit;
+
+  public MyFilter(int limit) {
+    this.limit = limit;
+  }
+
+  @Override
+  public boolean filter(Integer value) throws Exception {
+    return value > limit;
+  }
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val toFilter = env.fromElements(1, 2, 3)
+
+toFilter.filter(new MyFilter(2))
+
+class MyFilter(limit: Int) extends FilterFunction[Int] {
+  override def filter(value: Int): Boolean = {
+    value > limit
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+#### Via `withParameters(Configuration)`
+
+This method takes a Configuration object as an argument, which will be passed to the [rich function](#rich-functions)'s `open()`
+method. The Configuration object is a Map from String keys to different value types.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataSet<Integer> toFilter = env.fromElements(1, 2, 3);
+
+Configuration config = new Configuration();
+config.setInteger("limit", 2);
+
+toFilter.filter(new RichFilterFunction<Integer>() {
+    private int limit;
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+      limit = parameters.getInteger("limit", 0);
+    }
+
+    @Override
+    public boolean filter(Integer value) throws Exception {
+      return value > limit;
+    }
+}).withParameters(config);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val toFilter = env.fromElements(1, 2, 3)
+
+val c = new Configuration()
+c.setInteger("limit", 2)
+
+toFilter.filter(new RichFilterFunction[Int]() {
+    var limit = 0
+
+    override def open(config: Configuration): Unit = {
+      limit = config.getInteger("limit", 0)
+    }
+
+    def filter(in: Int): Boolean = {
+        in > limit
+    }
+}).withParameters(c)
+{% endhighlight %}
+</div>
+</div>
+
+[Back to top](#top)
 
 Program Packaging & Distributed Execution
 -----------------------------------------