You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/04/22 16:47:37 UTC
flink git commit: [FLINK-1422] [docs] Add function parametrization
docs
Repository: flink
Updated Branches:
refs/heads/master f1ee90ccb -> 4e61775f7
[FLINK-1422] [docs] Add function parametrization docs
This closes #350.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e61775f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e61775f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e61775f
Branch: refs/heads/master
Commit: 4e61775f71da50cc69a66ac26ca45e23affde2a8
Parents: f1ee90c
Author: zentol <s....@web.de>
Authored: Wed Apr 22 16:14:16 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Apr 22 16:45:32 2015 +0200
----------------------------------------------------------------------
docs/apis/programming_guide.md | 101 +++++++++++++++++++++++++++++++++++-
1 file changed, 99 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4e61775f/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index 1d6723f..7d21200 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -1320,8 +1320,9 @@ data.map (new RichMapFunction[String, Int] {
Rich functions provide, in addition to the user-defined function (map,
reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and
-`setRuntimeContext`. These are useful for creating and finalizing
-local state, accessing broadcast variables (see
+`setRuntimeContext`. These are useful for parameterizing the function
+(see [Passing Parameters to Functions](#passing-parameters-to-functions)),
+creating and finalizing local state, accessing broadcast variables (see
[Broadcast Variables](#broadcast-variables), and for accessing runtime
information such as accumulators and counters (see
[Accumulators and Counters](#accumulators--counters), and information
@@ -2584,6 +2585,102 @@ of a function, or use the `withParameters(...)` method to pass in a configuratio
[Back to top](#top)
+Passing Parameters to Functions
+-------------------
+
+Parameters can be passed to functions using either the constructor or the `withParameters(Configuration)` method. The parameters are serialized
+as part of the function object and shipped to all parallel task instances.
+
+#### Via Constructor
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataSet<Integer> toFilter = env.fromElements(1, 2, 3);
+
+toFilter.filter(new MyFilter(2));
+
+private static class MyFilter implements FilterFunction<Integer> {
+
+ private final int limit;
+
+ public MyFilter(int limit) {
+ this.limit = limit;
+ }
+
+ @Override
+ public boolean filter(Integer value) throws Exception {
+ return value > limit;
+ }
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val toFilter = env.fromElements(1, 2, 3)
+
+toFilter.filter(new MyFilter(2))
+
+class MyFilter(limit: Int) extends FilterFunction[Int] {
+ override def filter(value: Int): Boolean = {
+ value > limit
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
+#### Via `withParameters(Configuration)`
+
+This method takes a Configuration object as an argument, which will be passed to the [rich function](#rich-functions)'s `open()`
+method. The Configuration object is a Map from String keys to different value types.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataSet<Integer> toFilter = env.fromElements(1, 2, 3);
+
+Configuration config = new Configuration();
+config.setInteger("limit", 2);
+
+toFilter.filter(new RichFilterFunction<Integer>() {
+ private int limit;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ limit = parameters.getInteger("limit", 0);
+ }
+
+ @Override
+ public boolean filter(Integer value) throws Exception {
+ return value > limit;
+ }
+}).withParameters(config);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val toFilter = env.fromElements(1, 2, 3)
+
+val c = new Configuration()
+c.setInteger("limit", 2)
+
+toFilter.filter(new RichFilterFunction[Int]() {
+ var limit = 0
+
+ override def open(config: Configuration): Unit = {
+ limit = config.getInteger("limit", 0)
+ }
+
+ def filter(in: Int): Boolean = {
+ in > limit
+ }
+}).withParameters(c)
+{% endhighlight %}
+</div>
+</div>
+
+[Back to top](#top)
Program Packaging & Distributed Execution
-----------------------------------------