You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/21 15:54:19 UTC

[6/6] flink git commit: [docs] Update docs on data types and serialization, to include type hints, type registration, and serializer registration.

[docs] Update docs on data types and serialization, to include type hints, type registration, and serializer registration.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5160568
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5160568
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5160568

Branch: refs/heads/master
Commit: b516056869a825db0913ff852c071de198f4d390
Parents: 5f67b54
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 20 23:10:44 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 21 17:53:33 2016 +0200

----------------------------------------------------------------------
 docs/dev/types_serialization.md | 153 +++++++++++++++++++++++------------
 1 file changed, 102 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5160568/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index 8a32491..4b8e25f 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -1,5 +1,5 @@
 ---
-title: "Data Types"
+title: "Data Types & Serialization"
 nav-id: types
 nav-parent_id: dev
 nav-pos: 9
@@ -23,13 +23,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Flink handles types in a unique way, containing its own type descriptors,
-generic type extraction, and type serialization framework.
-This document describes the concepts and the rationale behind them.
-
-There are fundamental differences in the way that the Scala API and
-the Java API handle type information, so most of the issues described
-here relate only to one of the to APIs.
+Apache Flink handles data types and serialization in a unique way, containing its own type descriptors,
+generic type extraction, and type serialization framework. This document describes the concepts and the rationale behind them.
 
 * This will be replaced by the TOC
 {:toc}
@@ -37,21 +32,43 @@ here relate only to one of the to APIs.
 
 ## Type handling in Flink
 
-Flink tries to know as much information about what types enter and leave user functions as possible.
-This stands in contrast to the approach to just assuming nothing and letting the
-programming language and serialization framework handle all types dynamically.
+Flink tries to infer a lot of information about the data types that are exchanged and stored during the distributed computation.
+Think about it like a database that infers the schema of tables. In most cases, Flink infers all necessary information seamlessly
+by itself. Having the type information allows Flink to do some cool things:
+
+* Using POJOs types and grouping / joining / aggregating them by referring to field names (like `dataSet.keyBy("username")`).
+  The type information allows Flink to check (for typos and type compatibility) early rather than failing later ar runtime.
+
+* The more Flink knows about data types, the better the serialization and data layout schemes are.
+  That is quite important for the memory usage paradigm in Flink (work on serialized data inside/outside the heap where ever possible
+  and make serialization very cheap).
+
+* Finally, it also spares users in the majority of cases from worrying about serialization frameworks and having to register types.
+
+In general, the information about data types is needed during the *pre-flight phase* - that is, when the program's calls on `DataStream`
+and `DataSet` are made, and before any call to `execute()`, `print()`, `count()`, or `collect()`.
+
+
+## Most Frequent Issues
+
+The most frequent issues where users need to interact with Flink's data type handling are:
 
-* To allow using POJOs and grouping/joining them by referring to field names, Flink needs the type
-  information to make checks (for typos and type compatibility) before the job is executed.
+* **Registering subtypes:** If the function signatures describe only the supertypes, but they actually use subtypes of those during execution,
+  it may increase performance a lot to make Flink aware of these subtypes.
+  For that, call `.registerType(clazz)` on the `StreamExecutionEnvironment` or `ExecutionEnvironment` for each subtype.
 
-* The more we know, the better serialization and data layout schemes the compiler/optimizer can develop.
-  That is quite important for the memory usage paradigm in Flink (work on serialized data
-  inside/outside the heap and make serialization very cheap).
+* **Registering custom serializers:** Flink falls back to [Kryo](https://github.com/EsotericSoftware/kryo) for the types that it does not handle transparently
+  by itself. Not all types are seamlessly handled by Kryo (and thus by Flink). For example, many Google Guava collection types do not work well
+  by default. The solution is to register additional serializers for the types that cause problems.
+  Call `.getConfig().addDefaultKryoSerializer(clazz, serializer)` on the `StreamExecutionEnvironment` or `ExecutionEnvironment`.
+  Additional Kryo serializers are available in many libraries.
 
-* For the upcoming logical programs (see roadmap draft) we need this to know the "schema" of functions.
+* **Adding Type Hints:** Sometimes, when Flink cannot infer the generic types despits all tricks, a user must pass a *type hint*. That is generally
+  only necessary in the Java API. The [Type Hints Section](#type-hints-in-the-java-api) describes that in more detail.
 
-* Finally, it also spares users having to worry about serialization frameworks and having to register
-  types at those frameworks.
+* **Manually creating a `TypeInformation`:** This may be necessary for some API calls where it is not possible for Flink to infer
+  the data types due to Java's generic type erasure. See [Creating a TypeInformation or TypeSerializer](#creating-a-typeinformation-or-typeserializer)
+  for details.
 
 
 ## Flink's TypeInformation class
@@ -75,7 +92,7 @@ Internally, Flink makes the following distinctions between types:
 
   * POJOs: classes that follow a certain bean-like pattern
 
-* Scala auxiliary types (Option, Either, Lists, Maps, ...)
+* Auxiliary types (Option, Either, Lists, Maps, ...)
 
 * Generic types: These will not be serialized by Flink itself, but by Kryo.
 
@@ -84,18 +101,66 @@ names in the definition of keys: `dataSet.join(another).where("name").equalTo("p
 They are also transparent to the runtime and can be handled very efficiently by Flink.
 
 
-**Rules for POJO types**
+#### Rules for POJO types
 
 Flink recognizes a data type as a POJO type (and allows "by-name" field referencing) if the following
 conditions are fulfilled:
 
 * The class is public and standalone (no non-static inner class)
 * The class has a public no-argument constructor
-* All fields in the class (and all superclasses) are either public or
-  or have a public getter and a setter method that follows the Java beans
+* All fields in the class (and all superclasses) are either public (and non-final)
+  or have a public getter- and a setter- method that follows the Java beans
   naming conventions for getters and setters.
 
 
+#### Creating a TypeInformation or TypeSerializer
+
+To create a TypeInformation object for a type, use the language specific way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+Because Java generally erases generic type information, you need to pass the type to the TypeInformation
+construction:
+
+For non-generic types, you can pass the Class:
+{% highlight java %}
+TypeInformation<String> info = TypeInformation.of(String.class);
+{% endhighlight %}
+
+For generic types, you need to "capture" the generic type information via the `TypeHint`:
+{% highlight java %}
+TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
+{% endhighlight %}
+Internally, this creates an anonymous subclass of the TypeHint that captures the generic information to preserve it
+until runtime.
+</div>
+
+<div data-lang="scala" markdown="1">
+In Scala, Flink uses *macros* that runs at compile time and captures all generic type information while it is
+still available.
+{% highlight scala %}
+// important: this import is needed to access the 'createTypeInformation' macro function
+import org.apache.flink.streaming.api.scala._
+
+val stringInfo: TypeInformation[String] = createTypeInformation[String]
+
+val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]
+{% endhighlight %}
+
+You can still use the same method as in Java as a fallback.
+</div>
+</div>
+
+To create a `TypeSerializer`, simply call `typeInfo.createSerializer(config)` on the `TypeInformation` object.
+
+The `config` parameter is of type `ExecutionConfig` and holds the information about the program's registered
+custom serializers. Where ever possibly, try to pass the programs proper ExecutionConfig. You can usually
+obtain it from `DataStream` or `DataSet` via calling `getExecutionConfig()`. Inside functions (like `MapFunction`), you can
+get it by making the function a [Rich Function]() and calling `getRuntimeContext().getExecutionConfig()`.
+
+--------
+--------
+
 ## Type Information in the Scala API
 
 Scala has very elaborate concepts for runtime type information though *type manifests* and *class tags*. In
@@ -156,15 +221,15 @@ def selectFirst[T : TypeInformation](input: DataSet[(T, _)]) : DataSet[T] = {
 {% endhighlight %}
 
 
+--------
+--------
 
-## Type Information in the Java API
 
-Java in general erases generic type information. Only for subclasses of generic classes, the subclass
-stores the type to which the generic type variables bind.
+## Type Information in the Java API
 
-Flink uses reflection on the (anonymous) classes that implement the user functions to figure out the types of
-the generic parameters of the function. This logic also contains some simple type inference for cases where
-the return types of functions are dependent on input types, such as in the generic utility method below:
+In the general case, Java erases generic type information. Flink tries to reconstruct as much type information
+as possible via reflection, using the few bits that Java preserves (mainly function signatures and subclass information).
+This logic also contains some simple type inference for cases where the return type of a function depends on its input type:
 
 {% highlight java %}
 public class AppendOne<T> extends MapFunction<T, Tuple2<T, Long>> {
@@ -175,16 +240,14 @@ public class AppendOne<T> extends MapFunction<T, Tuple2<T, Long>> {
 }
 {% endhighlight %}
 
-Not in all cases can Flink figure out the data types of functions reliably in Java.
-Some issues remain with generic lambdas (we are trying to solve this with the Java community,
-see below) and with generic type variables that we cannot infer.
+There are cases where Flink cannot reconstruct all generic type information. In that case, a user has to help out via *type hints*.
 
 
 #### Type Hints in the Java API
 
-To help cases where Flink cannot reconstruct the erased generic type information, the Java API
-offers so called *type hints* from version 0.9 on. The type hints tell the system the type of
-the data set produced by a function. The following gives an example:
+In cases where Flink cannot reconstruct the erased generic type information, the Java API
+offers so called *type hints*. The type hints tell the system the type of
+the data stream or data set produced by a function:
 
 {% highlight java %}
 DataSet<SomeType> result = dataSet
@@ -193,12 +256,11 @@ DataSet<SomeType> result = dataSet
 {% endhighlight %}
 
 The `returns` statement specifies the produced type, in this case via a class. The hints support
-type definition through
+type definition via
 
 * Classes, for non-parameterized types (no generics)
-* Strings in the form of `returns("Tuple2<Integer, my.SomeType>")`, which are parsed and converted
-  to a TypeInformation.
-* A TypeInformation directly
+* TypeHints in the form of `returns(new TypeHint<Tuple2<Integer, SomeType>>(){})`. The `TypeHint` class
+  can capture generic type information and preserve it for the runtime (via an anonymous subclass).
 
 
 #### Type extraction for Java 8 lambdas
@@ -208,18 +270,7 @@ with an implementing class that extends the function interface.
 
 Currently, Flink tries to figure out which method implements the lambda and uses Java's generic signatures to
 determine the parameter types and the return type. However, these signatures are not generated for lambdas
-by all compilers (as of writing this document only reliably by the Eclipse JDT compiler 4.5 from Milestone 2
-onwards)
-
-
-**Improving Type information for Java Lambdas**
-
-One of the Flink committers (Timo Walther) has actually become active in the Eclipse JDT compiler community and
-in the OpenJDK community and submitted patches to the compiler to improve availability of type information
-available for Java 8 lambdas.
-
-The Eclipse JDT compiler has added support for this as of version 4.5 M4. Discussion about the feature in the
-OpenJDK compiler is pending.
+by all compilers (as of writing this document only reliably by the Eclipse JDT compiler from 4.5 onwards).
 
 
 #### Serialization of POJO types