You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/04/22 16:17:23 UTC

[26/30] flink git commit: [docs] Change doc layout

http://git-wip-us.apache.org/repos/asf/flink/blob/f1ee90cc/docs/apis/python.md
----------------------------------------------------------------------
diff --git a/docs/apis/python.md b/docs/apis/python.md
new file mode 100644
index 0000000..5bea544
--- /dev/null
+++ b/docs/apis/python.md
@@ -0,0 +1,606 @@
+---
+title: "Python Programming Guide"
+is_beta: true
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<a href="#top"></a>
+
+Analysis programs in Flink are regular programs that implement transformations on data sets
+(e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain
+sources (e.g., by reading files, or from collections). Results are returned via sinks, which may for
+example write the data to (distributed) files, or to standard output (for example the command line
+terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
+
+In order to create your own Flink program, we encourage you to start with the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations). The remaining sections act as references for additional
+operations and advanced features.
+
+* This will be replaced by the TOC
+{:toc}
+
+Example Program
+---------------
+
+The following program is a complete, working example of WordCount. You can copy &amp; paste the code
+to run it locally.
+
+{% highlight python %}
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import INT, STRING
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+
+class Adder(GroupReduceFunction):
+  def reduce(self, iterator, collector):
+    count, word = iterator.next()
+    count += sum([x[0] for x in iterator])
+    collector.collect((count, word))
+
+if __name__ == "__main__":
+  env = get_environment()
+  data = env.from_elements("Who's there?",
+   "I think I hear them. Stand, ho! Who's there?")
+  
+  data \
+    .flat_map(lambda x, c: [(1, word) for word in x.lower().split()], (INT, STRING)) \
+    .group_by(1) \
+    .reduce_group(Adder(), (INT, STRING), combinable=True) \
+    .output()
+  
+  env.execute(local=True)
+{% endhighlight %}
+
+[Back to top](#top)
+
+Program Skeleton
+----------------
+
+As we already saw in the example, Flink programs look like regular python
+programs with a `if __name__ == "__main__":` block. Each program consists of the same basic parts:
+
+1. Obtain an `Environment`,
+2. Load/create the initial data,
+3. Specify transformations on this data,
+4. Specify where to put the results of your computations, and
+5. Execute your program.
+
+We will now give an overview of each of those steps but please refer to the respective sections for
+more details. 
+
+
+The `Environment` is the basis for all Flink programs. You can
+obtain one using these static methods on class `Environment`:
+
+{% highlight python %}
+get_environment()
+{% endhighlight %}
+
+For specifying data sources the execution environment has several methods
+to read from files. To just read a text file as a sequence of lines, you can use:
+
+{% highlight python %}
+env = get_environment()
+text = env.read_text("file:///path/to/file")
+{% endhighlight %}
+
+This will give you a DataSet on which you can then apply transformations. For
+more information on data sources and input formats, please refer to
+[Data Sources](#data-sources).
+
+Once you have a DataSet you can apply transformations to create a new
+DataSet which you can then write to a file, transform again, or
+combine with other DataSets. You apply transformations by calling
+methods on DataSet with your own custom transformation function. For example,
+a map transformation looks like this:
+
+{% highlight python %}
+data.map(lambda x: x*2, INT)
+{% endhighlight %}
+
+This will create a new DataSet by doubling every value in the original DataSet. 
+For more information and a list of all the transformations,
+please refer to [Transformations](#transformations).
+
+Once you have a DataSet that needs to be written to disk you can call one
+of these methods on DataSet:
+
+{% highlight python %}
+data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
+write_csv("<file-path>", WriteMode=Constants.NO_OVERWRITE, line_delimiter='\n', field_delimiter=',')
+output()
+{% endhighlight %}
+
+The last method is only useful for developing/debugging on a local machine,
+it will output the contents of the DataSet to standard output. (Note that in
+a cluster, the result goes to the standard out stream of the cluster nodes and ends
+up in the *.out* files of the workers).
+The first two do as the name suggests. 
+Please refer to [Data Sinks](#data-sinks) for more information on writing to files.
+
+Once you specified the complete program you need to call `execute` on
+the `Environment`. This will either execute on your local machine or submit your program 
+for execution on a cluster, depending on how Flink was started. You can force
+a local execution by using `execute(local=True)`.
+
+[Back to top](#top)
+
+Project setup
+---------------
+
+Apart from setting up Flink, no additional work is required. The python package can be found in the /resource folder of your Flink distribution. The flink package, along with the plan and optional packages are automatically distributed among the cluster via HDFS when running a job.
+
+The Python API was tested on Linux systems that have Python 2.7 or 3.4 installed.
+
+[Back to top](#top)
+
+Lazy Evaluation
+---------------
+
+All Flink programs are executed lazily: When the program's main method is executed, the data loading
+and transformations do not happen directly. Rather, each operation is created and added to the
+program's plan. The operations are actually executed when one of the `execute()` methods is invoked
+on the Environment object. Whether the program is executed locally or on a cluster depends
+on the environment of the program.
+
+The lazy evaluation lets you construct sophisticated programs that Flink executes as one
+holistically planned unit.
+
+[Back to top](#top)
+
+
+Transformations
+---------------
+
+Data transformations transform one or more DataSets into a new DataSet. Programs can combine
+multiple transformations into sophisticated assemblies.
+
+This section gives a brief overview of the available transformations. The [transformations
+documentation](dataset_transformations.html) has a full description of all transformations with
+examples.
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>Map</strong></td>
+      <td>
+        <p>Takes one element and produces one element.</p>
+{% highlight python %}
+data.map(lambda x: x * 2, INT)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>FlatMap</strong></td>
+      <td>
+        <p>Takes one element and produces zero, one, or more elements. </p>
+{% highlight python %}
+data.flat_map(
+  lambda x,c: [(1,word) for word in line.lower().split() for line in x],
+  (INT, STRING))
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>MapPartition</strong></td>
+      <td>
+        <p>Transforms a parallel partition in a single function call. The function get the partition
+        as an `Iterator` and can produce an arbitrary number of result values. The number of
+        elements in each partition depends on the degree-of-parallelism and previous operations.</p>
+{% highlight python %}
+data.map_partition(lambda x,c: [value * 2 for value in x], INT)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Filter</strong></td>
+      <td>
+        <p>Evaluates a boolean function for each element and retains those for which the function
+        returns true.</p>
+{% highlight python %}
+data.filter(lambda x: x > 1000)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Reduce</strong></td>
+      <td>
+        <p>Combines a group of elements into a single element by repeatedly combining two elements
+        into one. Reduce may be applied on a full data set, or on a grouped data set.</p>
+{% highlight python %}
+data.reduce(lambda x,y : x + y)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>ReduceGroup</strong></td>
+      <td>
+        <p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a
+        full data set, or on a grouped data set.</p>
+{% highlight python %}
+class Adder(GroupReduceFunction):
+  def reduce(self, iterator, collector):
+    count, word = iterator.next()
+    count += sum([x[0] for x in iterator)      
+    collector.collect((count, word))
+
+data.reduce_group(Adder(), (INT, STRING))
+{% endhighlight %}
+      </td>
+    </tr>
+
+    </tr>
+      <td><strong>Join</strong></td>
+      <td>
+        Joins two data sets by creating all pairs of elements that are equal on their keys.
+        Optionally uses a JoinFunction to turn the pair of elements into a single element. 
+        See <a href="#specifying-keys">keys</a> on how to define join keys.
+{% highlight python %}
+# In this case tuple fields are used as keys. 
+# "0" is the join field on the first tuple
+# "1" is the join field on the second tuple.
+result = input1.join(input2).where(0).equal_to(1)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>CoGroup</strong></td>
+      <td>
+        <p>The two-dimensional variant of the reduce operation. Groups each input on one or more
+        fields and then joins the groups. The transformation function is called per pair of groups.
+        See <a href="#specifying-keys">keys</a> on how to define coGroup keys.</p>
+{% highlight python %}
+data1.co_group(data2).where(0).equal_to(1)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Cross</strong></td>
+      <td>
+        <p>Builds the Cartesian product (cross product) of two inputs, creating all pairs of
+        elements. Optionally uses a CrossFunction to turn the pair of elements into a single
+        element.</p>
+{% highlight python %}
+result = data1.cross(data2)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Union</strong></td>
+      <td>
+        <p>Produces the union of two data sets.</p>
+{% highlight python %}
+data.union(data2)
+{% endhighlight %}
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+[Back to Top](#top)
+
+
+Specifying Keys
+-------------
+
+Some transformations (like Join or CoGroup) require that a key is defined on
+its argument DataSets, and other transformations (Reduce, GroupReduce) allow that the DataSet is grouped on a key before they are
+applied.
+
+A DataSet is grouped as
+{% highlight python %}
+reduced = data \
+  .group_by(<define key here>) \
+  .reduce_group(<do something>)
+{% endhighlight %}
+
+The data model of Flink is not based on key-value pairs. Therefore,
+you do not need to physically pack the data set types into keys and
+values. Keys are "virtual": they are defined as functions over the
+actual data to guide the grouping operator.
+
+### Define keys for Tuples
+{:.no_toc}
+
+The simplest case is grouping a data set of Tuples on one or more
+fields of the Tuple:
+{% highlight python %}
+reduced = data \
+  .group_by(0) \
+  .reduce_group(<do something>)
+{% endhighlight %}
+
+The data set is grouped on the first field of the tuples. 
+The group-reduce function will thus receive groups of tuples with
+the same value in the first field.
+
+{% highlight python %}
+grouped = data \
+  .group_by(0,1) \
+  .reduce(/*do something*/)
+{% endhighlight %}
+
+The data set is grouped on the composite key consisting of the first and the
+second fields, therefore the reduce function will receive groups
+with the same value for both fields.
+
+A note on nested Tuples: If you have a DataSet with a nested tuple
+specifying `group_by(<index of tuple>)` will cause the system to use the full tuple as a key.
+
+[Back to top](#top)
+
+
+Passing Functions to Flink
+--------------------------
+
+Certain operations require user-defined functions, whereas all of them accept lambda functions and rich functions as arguments.
+
+{% highlight python %}
+data.filter(lambda x: x > 5)
+{% endhighlight %}
+
+{% highlight python %}
+class Filter(FilterFunction):
+    def filter(self, value):
+        return value > 5
+
+data.filter(Filter())
+{% endhighlight %}
+
+Rich functions allow the use of imported functions, provide access to broadcast-variables, 
+can be parameterized using __init__(), and are the go-to-option for complex functions.
+They are also the only way to define an optional `combine` function for a reduce operation.
+
+Lambda functions allow the easy insertion of one-liners. Note that a lambda function has to return
+an iterable, if the operation can return multiple values. (All functions receiving a collector argument)
+
+Flink requires type information at the time when it prepares the program for execution 
+(when the main method of the program is called). This is done by passing an exemplary 
+object that has the desired type. This holds also for tuples.
+
+{% highlight python %}
+(INT, STRING)
+{% endhighlight %}
+
+Would denote a tuple containing an int and a string. Note that for Operations that work strictly on tuples (like cross), no braces are required.
+
+There are a few Constants defined in flink.plan.Constants that allow this in a more readable fashion.
+
+[Back to top](#top)
+
+Data Types
+----------
+
+Flink's Python API currently only supports primitive python types (int, float, bool, string) and byte arrays.
+
+#### Tuples/Lists
+
+You can use the tuples (or lists) for composite types. Python tuples are mapped to the Flink Tuple type, that contain 
+a fix number of fields of various types (up to 25). Every field of a tuple can be a primitive type - including further tuples, resulting in nested tuples.
+
+{% highlight python %}
+word_counts = env.from_elements(("hello", 1), ("world",2))
+
+counts = word_counts.map(lambda x: x[1], INT)
+{% endhighlight %}
+
+When working with operators that require a Key for grouping or matching records,
+Tuples let you simply specify the positions of the fields to be used as key. You can specify more
+than one position to use composite keys (see [Section Data Transformations](#transformations)).
+
+{% highlight python %}
+wordCounts \
+    .group_by(0) \
+    .reduce(MyReduceFunction())
+{% endhighlight %}
+
+[Back to top](#top)
+
+Data Sources
+------------
+
+Data sources create the initial data sets, such as from files or from collections.
+
+File-based:
+
+- `read_text(path)` - Reads files line wise and returns them as Strings.
+- `read_csv(path, type)` - Parses files of comma (or another char) delimited fields.
+  Returns a DataSet of tuples. Supports the basic java types and their Value counterparts as field
+  types.
+
+Collection-based:
+
+- `from_elements(*args)` - Creates a data set from a Seq. All elements
+
+**Examples**
+
+{% highlight python %}
+env  = get_environment
+
+# read text file from local files system
+localLiens = env.read_text("file:#/path/to/my/textfile")
+
+ read text file from a HDFS running at nnHost:nnPort
+hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")
+
+ read a CSV file with three fields
+csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))
+
+ create a set from some given elements
+values = env.from_elements("Foo", "bar", "foobar", "fubar")
+{% endhighlight %}
+
+[Back to top](#top)
+
+Data Sinks
+----------
+
+Data sinks consume DataSets and are used to store or return them:
+
+- `write_text()` - Writes elements line-wise as Strings. The Strings are
+  obtained by calling the *str()* method of each element.
+- `write_csv(...)` - Writes tuples as comma-separated value files. Row and field
+  delimiters are configurable. The value for each field comes from the *str()* method of the objects.
+- `output()` - Prints the *str()* value of each element on the
+  standard out.
+
+A DataSet can be input to multiple operations. Programs can write or print a data set and at the
+same time run additional transformations on them.
+
+**Examples**
+
+Standard data sink methods:
+
+{% highlight scala %}
+ write DataSet to a file on the local file system
+textData.write_text("file:///my/result/on/localFS")
+
+ write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
+textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")
+
+ write DataSet to a file and overwrite the file if it exists
+textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)
+
+ tuples as lines with pipe as the separator "a|b|c"
+values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")
+
+ this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
+values.write_text("file:///path/to/the/result/file")
+{% endhighlight %}
+
+[Back to top](#top)
+
+Broadcast Variables
+-------------------
+
+Broadcast variables allow you to make a data set available to all parallel instances of an
+operation, in addition to the regular input of the operation. This is useful for auxiliary data
+sets, or data-dependent parameterization. The data set will then be accessible at the operator as a
+Collection.
+
+- **Broadcast**: broadcast sets are registered by name via `with_broadcast_set(DataSet, String)`
+- **Access**: accessible via `self.context.get_broadcast_variable(String)` at the target operator
+
+{% highlight python %}
+class MapperBcv(MapFunction):
+    def map(self, value):
+        factor = self.context.get_broadcast_variable("bcv")[0][0]
+        return value * factor
+
+# 1. The DataSet to be broadcasted
+toBroadcast = env.from_elements(1, 2, 3) 
+data = env.from_elements("a", "b")
+
+# 2. Broadcast the DataSet
+data.map(MapperBcv(), INT).with_broadcast_set("bcv", toBroadcast) 
+{% endhighlight %}
+
+Make sure that the names (`bcv` in the previous example) match when registering and
+accessing broadcasted data sets.
+
+**Note**: As the content of broadcast variables is kept in-memory on each node, it should not become
+too large. For simpler things like scalar values you can simply parameterize the rich function.
+
+[Back to top](#top)
+
+Parallel Execution
+------------------
+
+This section describes how the parallel execution of programs can be configured in Flink. A Flink
+program consists of multiple tasks (operators, data sources, and sinks). A task is split into
+several parallel instances for execution and each parallel instance processes a subset of the task's
+input data. The number of parallel instances of a task is called its *parallelism* or *degree of
+parallelism (DOP)*.
+
+The degree of parallelism of a task can be specified in Flink on different levels.
+
+### Execution Environment Level
+
+Flink programs are executed in the context of an [execution environmentt](#program-skeleton). An
+execution environment defines a default parallelism for all operators, data sources, and data sinks
+it executes. Execution environment parallelism can be overwritten by explicitly configuring the
+parallelism of an operator.
+
+The default parallelism of an execution environment can be specified by calling the
+`set_degree_of_parallelism()` method. To execute all operators, data sources, and data sinks of the
+[WordCount](#example-program) example program with a parallelism of `3`, set the default parallelism of the
+execution environment as follows:
+
+{% highlight python %}
+env = get_environment()
+env.set_degree_of_parallelism(3)
+
+text.flat_map(lambda x,c: x.lower().split(), (INT, STRING)) \
+    .group_by(1) \
+    .reduce_group(Adder(), (INT, STRING), combinable=True) \
+    .output()
+
+env.execute()
+{% endhighlight %}
+
+### System Level
+
+A system-wide default parallelism for all execution environments can be defined by setting the
+`parallelization.degree.default` property in `./conf/flink-conf.yaml`. See the
+[Configuration](config.html) documentation for details.
+
+[Back to top](#top)
+
+Executing Plans
+---------------
+
+To run the plan with Flink, go to your Flink distribution, and run the pyflink.sh script from the /bin folder. 
+use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script containing the plan has to be passed 
+as the first argument, followed by a number of additional python packages, and finally, separated by - additional 
+arguments that will be fed to the script. 
+
+{% highlight python %}
+./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]
+{% endhighlight %}
+
+[Back to top](#top)
+
+Debugging
+---------------
+
+If you are running Flink programs locally, you can debug your program following this guide.
+First you have to enable debugging by setting the debug switch in the `env.execute(debug=True)` call. After
+submitting your program, open the jobmanager log file, and look for a line that says 
+`Waiting for external Process : <taskname>. Run python /tmp/flink/executor.py <port>` Now open `/tmp/flink` in your python
+IDE and run the `executor.py <port>`.
+
+[Back to top](#top)