You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/05/12 11:42:17 UTC

[1/3] flink-web git commit: Added blog post about how Flink operates on binary data

Repository: flink-web
Updated Branches:
  refs/heads/asf-site 49b53df18 -> 7a9b48731


http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
----------------------------------------------------------------------
diff --git a/content/news/2015/05/11/Juggling-with-Bits-and-Bytes.html b/content/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
new file mode 100644
index 0000000..07125fb
--- /dev/null
+++ b/content/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
@@ -0,0 +1,473 @@
+<!DOCTYPE html>
+<html lang="en">
+    <head>
+	    <meta charset="utf-8">
+	    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+	    <meta name="viewport" content="width=device-width, initial-scale=1">
+
+	    <title>Apache Flink: Juggling with Bits and Bytes</title>
+	    <link rel="shortcut icon" href="favicon.ico" type="image/x-icon">
+	    <link rel="icon" href="favicon.ico" type="image/x-icon">
+	    <link rel="stylesheet" href="/css/bootstrap.css">
+	    <link rel="stylesheet" href="/css/bootstrap-lumen-custom.css">
+	    <link rel="stylesheet" href="/css/syntax.css">
+	    <link rel="stylesheet" href="/css/custom.css">
+	    <link href="/css/main/main.css" rel="stylesheet">
+            <link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" title="Flink Blog RSS feed" />
+	    <!-- <link href="//maxcdn.bootstrapcdn.com/font-awesome/4.1.0/css/font-awesome.min.css" rel="stylesheet"> -->
+	    <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.0/jquery.min.js"></script>
+	    <script src="/js/bootstrap.min.js"></script>
+	    <script src="/js/codetabs.js"></script>
+    </head>
+    <body>
+    <div class="af-header-container af-inner-pages-navigation">
+	<header>
+		<div class="container">
+			<div class="row">
+				<div class="col-md-1 af-mobile-nav-bar">
+					<a href="/" title="Home">
+					<img class="hidden-xs hidden-sm img-responsive"
+						src="/img/main/logo.png" alt="Apache Flink Logo">
+					</a>
+					<div class="row visible-xs">
+						<div class="col-xs-3">
+						    <a href="/" title="Home">
+							<img class="hidden-x hidden-sm img-responsive"
+								src="/img/main/logo.png" alt="Apache Flink Logo">
+							</a>
+						</div>
+						<div class="col-xs-5"></div>
+						<div class="col-xs-4">
+							<div class="af-mobile-btn">
+								<span class="glyphicon glyphicon-plus"></span>
+							</div>
+						</div>
+					</div>
+				</div>
+				<!-- Navigation -->
+				<div class="col-md-11">
+					<nav class="af-main-nav" role="navigation">
+						<ul>
+							<li><a href="#" class="af-nav-links">Quickstart
+									<b class="caret"></b>
+							</a>
+								<ul class="af-dropdown-menu">
+									<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/setup_quickstart.html">Setup
+											Flink</a></li>
+									<li><a
+										href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/java_api_quickstart.html">Java
+											API</a></li>
+									<li><a
+										href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/scala_api_quickstart.html">Scala
+											API</a></li>
+								</ul></li>
+							<li><a href="/downloads.html">Download</a></li>
+							<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/faq.html">FAQ</a></li>
+							<li><a href="#" class="af-nav-links">Documentation <b
+									class="caret"></b></a>
+							  <ul class="af-dropdown-menu">
+                                                            		<li class="af-separator">Current Snapshot:</li>
+									<li></li>
+									<li><a href="http://ci.apache.org/projects/flink/flink-docs-master/">0.9</a></li>
+									<li><a href="http://ci.apache.org/projects/flink/flink-docs-master/api/java">0.9 Javadocs</a></li>
+									<li><a href="http://ci.apache.org/projects/flink/flink-docs-master/api/scala/index.html#org.apache.flink.api.scala.package">0.9 Scaladocs</a></li>
+									<li class="divider"></li>
+									<li class="af-separator">Current Stable:</li>
+									<li></li>
+									<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/">0.8.1</a></li>
+									<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/api/java">0.8.1 Javadocs</a></li>
+									<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/api/scala/index.html#org.apache.flink.api.scala.package">0.8.1 Scaladocs</a></li>
+									<li class="divider"></li>
+									<li></li>
+									<li><a href="/archive.html">Archive</a></li>
+									<li></li>
+								</ul></li>
+							<li><a href="#" class="af-nav-links">Community <b
+									class="caret"></b></a>
+								<ul class="af-dropdown-menu">
+									<li><a href="/community.html#getting-help">Getting Help</a></li>
+									<li><a href="/community.html#mailing-lists">Mailing Lists</a></li>
+									<li><a href="/community.html#issues">Issues</a></li>
+									<li><a href="/community.html#team">Team</a></li>
+									<li class="divider"></li>
+									<li><a href="/how-to-contribute.html">How To
+											Contribute</a></li>
+									<li><a href="/coding_guidelines.html">Coding
+											Guidelines</a></li>
+								</ul></li>
+							<li><a href="#" class="af-nav-links">Project <b
+									class="caret"></b></a>
+								<ul class="af-dropdown-menu">
+									<li><a href="/material.html">Material</a></li>
+									<li><a href="http://www.apache.org/">Apache Software
+											Foundation <span class="glyphicon glyphicon-new-window"></span>
+									</a></li>
+									<li><a
+										href="https://cwiki.apache.org/confluence/display/FLINK">Wiki
+											<span class="glyphicon glyphicon-new-window"></span>
+									</a></li>
+									<li><a
+										href="https://wiki.apache.org/incubator/StratosphereProposal">Incubator
+											Proposal <span class="glyphicon glyphicon-new-window"></span>
+									</a></li>
+									<li><a href="http://www.apache.org/licenses/LICENSE-2.0">License
+											<span class="glyphicon glyphicon-new-window"></span>
+									</a></li>
+									<li><a href="https://github.com/apache/incubator-flink">Source
+											Code <span class="glyphicon glyphicon-new-window"></span>
+									</a></li>
+								</ul></li>
+							<li><a href="/blog/index.html" class="">Blog</a></li>
+						</ul>
+					</nav>
+				</div>
+			</div>
+		</div>
+	</header>
+</div>
+
+
+    <div style="padding-top:50px" class="container">
+        <div class="container">
+    <div class="row">
+		<div class="col-md-2"></div>
+		<div class="col-md-8">
+			<article>
+				<h2>Juggling with Bits and Bytes</h2>
+				    <p class="meta">11 May 2015</p>
+				<div>
+				    <h3 id="how-apache-flink-operates-on-binary-data">How Apache Flink operates on binary data</h3>
+
+<p>Nowadays, a lot of open-source systems for analyzing large data sets are implemented in Java or other JVM-based programming languages. The most well-known example is Apache Hadoop, but also newer frameworks such as Apache Spark, Apache Drill, and also Apache Flink run on JVMs. A common challenge that JVM-based data analysis engines face is to store large amounts of data in memory - both for caching and for efficient processing such as sorting and joining of data. Managing the JVM memory well makes the difference between a system that is hard to configure and has unpredictable reliability and performance and a system that behaves robustly with few configuration knobs.</p>
+
+<p>In this blog post we discuss how Apache Flink manages memory, talk about its custom data de/serialization stack, and show how it operates on binary data.</p>
+
+<h3 id="data-objects?-let’s-put-them-on-the-heap!">Data Objects? Let’s put them on the heap!</h3>
+
+<p>The most straight-forward approach to process lots of data in a JVM is to put it as objects on the heap and operate on these objects. Caching a data set as objects would be as simple as maintaining a list containing an object for each record. An in-memory sort would simply sort the list of objects.
+However, this approach has a few notable drawbacks. First of all it is not trivial to watch and control heap memory usage when a lot of objects are created and invalidated constantly. Memory overallocation instantly kills the JVM with an <code>OutOfMemoryError</code>. Another aspect is garbage collection on multi-GB JVMs which are flooded with new objects. The overhead of garbage collection in such environments can easily reach 50% and more. Finally, Java objects come with a certain space overhead depending on the JVM and platform. For data sets with many small objects this can significantly reduce the effectively usable amount of memory. Given proficient system design and careful, use-case specific system parameter tuning, heap memory usage can be more or less controlled and <code>OutOfMemoryErrors</code> avoided. However, such setups are rather fragile especially if data characteristics or the execution environment change.</p>
+
+<h3 id="what-is-flink-doing-about-that?">What is Flink doing about that?</h3>
+
+<p>Apache Flink has its roots at a research project which aimed to combine the best technologies of MapReduce-based systems and parallel database systems. Coming from this background, Flink has always had its own way of processing data in-memory. Instead of putting lots of objects on the heap, Flink serializes objects into a fixed number of pre-allocated memory segments. Its DBMS-style sort and join algorithms operate as much as possible on this binary data to keep the de/serialization overhead at a minimum. If more data needs to be processed than can be kept in memory, Flink’s operators partially spill data to disk. In fact, a lot of Flink’s internal implementations look more like C/C++ rather than common Java. The following figure gives a high-level overview of how Flink stores data serialized in memory segments and spills to disk if necessary.</p>
+
+<p><center>
+<img src="/img/blog/memory-mgmt.png" style="width:90%;margin:15px">
+</center></p>
+
+<p>Flink’s style of active memory management and operating on binary data has several benefits: </p>
+
+<ol>
+<li><strong>Memory-safe execution &amp; efficient out-of-core algorithms.</strong> Due to the fixed amount of allocated memory segments, it is trivial to monitor remaining memory resources. In case of memory shortage, processing operators can efficiently write larger batches of memory segments to disk and later them read back. Consequently, <code>OutOfMemoryErrors</code> are effectively prevented.</li>
+<li><strong>Reduced garbage collection pressure.</strong> Because all long-lived data is in binary representation in Flink&#39;s managed memory, all data objects are short-lived or even mutable and can be reused. Short-lived objects can be more efficiently garbage-collected, which significantly reduces garbage collection pressure. Right now, the pre-allocated memory segments are long-lived objects on the JVM heap, but the Flink community is actively working on allocating off-heap memory for this purpose. This effort will result in much smaller JVM heaps and facilitate even faster garbage collection cycles.</li>
+<li><strong>Space efficient data representation.</strong> Java objects have a storage overhead which can be avoided if the data is stored in a binary representation.</li>
+<li><strong>Efficient binary operations &amp; cache sensitivity.</strong> Binary data can be efficiently compared and operated on given a suitable binary representation. Furthermore, the binary representations can put related values, as well as hash codes, keys, and pointers, adjacently into memory. This gives data structures with usually more cache efficient access patterns.</li>
+</ol>
+
+<p>These properties of active memory management are very desirable in a data processing systems for large-scale data analytics but have a significant price tag attached. Active memory management and operating on binary data is not trivial to implement, i.e., using <code>java.util.HashMap</code> is much easier than implementing a spillable hash-table backed by byte arrays and a custom serialization stack. Of course Apache Flink is not the only JVM-based data processing system that operates on serialized binary data. Projects such as <a href="http://drill.apache.org/">Apache Drill</a>, <a href="http://ignite.incubator.apache.org/">Apache Ignite (incubating)</a> or <a href="http://projectgeode.org/">Apache Geode (incubating)</a> apply similar techniques and it was recently announced that also <a href="http://spark.apache.org/">Apache Spark</a> will evolve into this direction with <a href="https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html">
 Project Tungsten</a>. </p>
+
+<p>In the following we discuss in detail how Flink allocates memory, de/serializes objects, and operates on binary data. We will also show some performance numbers comparing processing objects on the heap and operating on binary data.</p>
+
+<h3 id="how-does-flink-allocate-memory?">How does Flink allocate memory?</h3>
+
+<p>A Flink worker, called TaskManager, is composed of several internal components such as an actor system for coordination with the Flink master, an IOManager that takes care of spilling data to disk and reading it back, and a MemoryManager that coordinates memory usage. In the context of this blog post, the MemoryManager is of most interest. </p>
+
+<p>The MemoryManager takes care of allocating, accounting, and distributing MemorySegments to data processing operators such as sort and join operators. A <a href="https://github.com/apache/flink/blob/release-0.9.0-milestone-1/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java">MemorySegment</a> is Flink’s distribution unit of memory and is backed by a regular Java byte array (size is 32 KB by default). A MemorySegment provides very efficient write and read access to its backed byte array using Java’s unsafe methods. You can think of a MemorySegment as a custom-tailored version of Java’s NIO ByteBuffer. In order to operate on multiple MemorySegments like on a larger chunk of consecutive memory, Flink uses logical views that implement Java’s <code>java.io.DataOutput</code> and <code>java.io.DataInput</code> interfaces.</p>
+
+<p>MemorySegments are allocated once at TaskManager start-up time and are destroyed when the TaskManager is shut down. Hence, they are reused and not garbage-collected over the whole lifetime of a TaskManager. After all internal data structures of a TaskManager have been initialized and all core services have been started, the MemoryManager starts creating MemorySegments. By default 70% of the JVM heap that is available after service initialization is allocated by the MemoryManager. It is also possible to configure an absolute amount of managed memory. The remaining JVM heap is used for objects that are instantiated during task processing, including objects created by user-defined functions. The following figure shows the memory distribution in the TaskManager JVM after startup.</p>
+
+<p><center>
+<img src="/img/blog/memory-alloc.png" style="width:60%;margin:15px">
+</center></p>
+
+<h3 id="how-does-flink-serialize-objects?">How does Flink serialize objects?</h3>
+
+<p>The Java ecosystem offers several libraries to convert objects into a binary representation and back. Common alternatives are standard Java serialization, <a href="https://github.com/EsotericSoftware/kryo">Kryo</a>, <a href="http://avro.apache.org/">Apache Avro</a>, <a href="http://thrift.apache.org/">Apache Thrift</a>, or Google’s <a href="https://github.com/google/protobuf">Protobuf</a>. Flink includes its own custom serialization framework in order to control the binary representation of data. This is important because operating on binary data such as comparing or even manipulating binary data requires exact knowledge of the serialization layout. Further, configuring the serialization layout with respect to operations that are performed on binary data can yield a significant performance boost. Flink’s serialization stack also leverages the fact, that the type of the objects which are going through de/serialization are exactly known before a program is executed. </p>
+
+<p>Flink programs can process data represented as arbitrary Java or Scala objects. Before a program is optimized, the data types at each processing step of the program’s data flow need to be identified. For Java programs, Flink features a reflection-based type extraction component to analyze the return types of user-defined functions. Scala programs are analyzed with help of the Scala compiler. Flink represents each data type with a <a href="https://github.com/apache/flink/blob/release-0.9.0-milestone-1/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java">TypeInformation</a>. Flink has TypeInformations for several kinds of data types, including:</p>
+
+<ul>
+<li>BasicTypeInfo: Any (boxed) Java primitive type or java.lang.String.</li>
+<li>BasicArrayTypeInfo: Any array of a (boxed) Java primitive type or java.lang.String.</li>
+<li>WritableTypeInfo: Any implementation of Hadoop’s Writable interface.</li>
+<li>TupleTypeInfo: Any Flink tuple (Tuple1 to Tuple25). Flink tuples are Java representations for fixed-length tuples with typed fields.</li>
+<li>CaseClassTypeInfo: Any Scala CaseClass (including Scala tuples).</li>
+<li>PojoTypeInfo: Any POJO (Java or Scala), i.e., an object with all fields either being public or accessible through getters and setter that follow the common naming conventions. </li>
+<li>GenericTypeInfo: Any data type that cannot be identified as another type.</li>
+</ul>
+
+<p>Each TypeInformation provides a serializer for the data type it represents. For example, a BasicTypeInfo returns a serializer that writes the respective primitive type, the serializer of a WritableTypeInfo delegates de/serialization to the write() and readFields() methods of the object implementing Hadoop’s Writable interface, and a GenericTypeInfo returns a serializer that delegates serialization to Kryo. Object serialization to a DataOutput which is backed by Flink MemorySegments goes automatically through Java’s efficient unsafe operations. For data types that can be used as keys, i.e., compared and hashed, the TypeInformation provides TypeComparators. TypeComparators compare and hash objects and can - depending on the concrete data type - also efficiently compare binary representations and extract fixed-length binary key prefixes. </p>
+
+<p>Tuple, Pojo, and CaseClass types are composite types, i.e., containers for one or more possibly nested data types. As such, their serializers and comparators are also composite and delegate the serialization and comparison of their member data types to the respective serializers and comparators. The following figure illustrates the serialization of a (nested) <code>Tuple3&lt;Integer, Double, Person&gt;</code> object where <code>Person</code> is a POJO and defined as follows:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">Person</span> <span class="o">{</span>
+    <span class="kd">public</span> <span class="kt">int</span> <span class="n">id</span><span class="o">;</span>
+    <span class="kd">public</span> <span class="n">String</span> <span class="n">name</span><span class="o">;</span>
+<span class="o">}</span>
+</code></pre></div>
+<p><center>
+<img src="/img/blog/data-serialization.png" style="width:80%;margin:15px">
+</center></p>
+
+<p>Flink’s type system can be easily extended by providing custom TypeInformations, Serializers, and Comparators to improve the performance of serializing and comparing custom data types. </p>
+
+<h3 id="how-does-flink-operate-on-binary-data?">How does Flink operate on binary data?</h3>
+
+<p>Similar to many other data processing APIs (including SQL), Flink’s APIs provide transformations to group, sort, and join data sets. These transformations operate on potentially very large data sets. Relational database systems feature very efficient algorithms for these purposes since several decades including external merge-sort, merge-join, and hybrid hash-join. Flink builds on this technology, but generalizes it to handle arbitrary objects using its custom serialization and comparison stack. In the following, we show how Flink operates with binary data by the example of Flink’s in-memory sort algorithm.</p>
+
+<p>Flink assigns a memory budget to its data processing operators. Upon initialization, a sort algorithm requests its memory budget from the MemoryManager and receives a corresponding set of MemorySegments. The set of MemorySegments becomes the memory pool of a so-called sort buffer which collects the data that is be sorted. The following figure illustrates how data objects are serialized into the sort buffer. </p>
+
+<p><center>
+<img src="/img/blog/sorting-binary-data-1.png" style="width:90%;margin:15px">
+</center></p>
+
+<p>The sort buffer is internally organized into two memory regions. The first region holds the full binary data of all objects. The second region contains pointers to the full binary object data and - depending on the key data type - fixed-length sort keys. When an object is added to the sort buffer, its binary data is appended to the first region, and a pointer (and possibly a key) is appended to the second region. The separation of actual data and pointers plus fixed-length keys is done for two purposes. It enables efficient swapping of fix-length entries (key+pointer) and also reduces the data that needs to be moved when sorting. If the sort key is a variable length data type such as a String, the fixed-length sort key must be a prefix key such as the first n characters of a String. Note, not all data types provide a fixed-length (prefix) sort key. When serializing objects into the sort buffer, both memory regions are extended with MemorySegments from the memory pool. Once the me
 mory pool is empty and no more objects can be added, the sort buffer is completely filled and can be sorted. Flink’s sort buffer provides methods to compare and swap elements. This makes the actual sort algorithm pluggable. By default, Flink uses a Quicksort implementation which can fall back to HeapSort. 
+The following figure shows how two objects are compared.</p>
+
+<p><center>
+<img src="/img/blog/sorting-binary-data-2.png" style="width:80%;margin:15px">
+</center></p>
+
+<p>The sort buffer compares two elements by comparing their binary fix-length sort keys. The comparison is successful if either done on a full key (not a prefix key) or if the binary prefix keys are not equal. If the prefix keys are equal (or the sort key data type does not provide a binary prefix key), the sort buffer follows the pointers to the actual object data, deserializes both objects and compares the objects. Depending on the result of the comparison, the sort algorithm decides whether to swap the compared elements or not. The sort buffer swaps two elements by moving their fix-length keys and pointers. The actual data is not moved. Once the sort algorithm finishes, the pointers in the sort buffer are correctly ordered. The following figure shows how the sorted data is returned from the sort buffer. </p>
+
+<p><center>
+<img src="/img/blog/sorting-binary-data-3.png" style="width:80%;margin:15px">
+</center></p>
+
+<p>The sorted data is returned by sequentially reading the pointer region of the sort buffer, skipping the sort keys and following the sorted pointers to the actual data. This data is either deserialized and returned as objects or the binary representation is copied and written to disk in case of an external merge-sort (see this <a href="http://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html">blog post on joins in Flink</a>).</p>
+
+<h3 id="show-me-numbers!">Show me numbers!</h3>
+
+<p>So, what does operating on binary data mean for performance? We’ll run a benchmark that sorts 10 million <code>Tuple2&lt;Integer, String&gt;</code> objects to find out. The values of the Integer field are sampled from a uniform distribution. The String field values have a length of 12 characters and are sampled from a long-tail distribution. The input data is provided by an iterator that returns a mutable object, i.e., the same tuple object instance is returned with different field values. Flink uses this technique when reading data from memory, network, or disk to avoid unnecessary object instantiations. The benchmarks are run in a JVM with 900 MB heap size which is approximately the required amount of memory to store and sort 10 million tuple objects on the heap without dying of an <code>OutOfMemoryError</code>. We sort the tuples on the Integer field and on the String field using three sorting methods:</p>
+
+<ol>
+<li><strong>Object-on-heap.</strong> The tuples are stored in a regular <code>java.util.ArrayList</code> with initial capacity set to 10 million entries and sorted using Java’s regular collection sort.</li>
+<li><strong>Flink-serialized.</strong> The tuple fields are serialized into a sort buffer of 600 MB size using Flink’s custom serializers, sorted as described above, and finally deserialized again. When sorting on the Integer field, the full Integer is used as sort key such that the sort happens entirely on binary data (no deserialization of objects required). For sorting on the String field a 8-byte prefix key is used and tuple objects are deserialized if the prefix keys are equal.</li>
+<li><strong>Kryo-serialized.</strong> The tuple fields are serialized into a sort buffer of 600 MB size using Kryo serialization and sorted without binary sort keys. This means that each pair-wise comparison requires two object to be deserialized.</li>
+</ol>
+
+<p>All sort methods are implemented using a single thread. The reported times are averaged over ten runs. After each run, we call <code>System.gc()</code> to request a garbage collection run which does not go into measured execution time. The following figure shows the time to store the input data in memory, sort it, and read it back as objects. </p>
+
+<p><center>
+<img src="/img/blog/sort-benchmark.png" style="width:90%;margin:15px">
+</center></p>
+
+<p>We see that Flink’s sort on binary data using its own serializers significantly outperforms the other two methods. Comparing to the object-on-heap method, we see that loading the data into memory is much faster. Since we actually collect the objects, there is no opportunity to reuse the object instances, but have to re-create every tuple. This is less efficient than Flink’s serializers (or Kryo serialization). On the other hand, reading objects from the heap comes for free compared to deserialization. In our benchmark, object cloning was more expensive than serialization and deserialization combined. Looking at the sorting time, we see that also sorting on the binary representation is faster than Java’s collection sort. Sorting data that was serialized using Kryo without binary sort key, is much slower than both other methods. This is due to the heavy deserialization overhead. Sorting the tuples on their String field is faster than sorting on the Integer field due to the lo
 ng-tailed value distribution which significantly reduces the number of pair-wise comparisons. To get a better feeling of what is happening during sorting we monitored the executing JVM using VisualVM. The following screenshots show heap memory usage, garbage collection activity and CPU usage over the execution of 10 runs.</p>
+
+<table>
+<tr>
+    <td>&nbsp;</td>
+    <th><center><b>Garbage Collection</b></center></td>
+    <th><center><b>Memory Usage</b></center></td>
+</tr>
+<tr>
+    <td><b>Object-on-Heap (int)</b></td>
+    <td><img src="/img/blog/objHeap-int-gc.png" style="width:80%;margin:15px"></td>
+    <td><img src="/img/blog/objHeap-int-mem.png" style="width:80%;margin:15px"></td>
+</tr>
+<tr>
+    <td><b>Flink-Serialized (int)</b></td>
+    <td><img src="/img/blog/flinkSer-int-gc.png" style="width:80%;margin:15px"></td>
+    <td><img src="/img/blog/flinkSer-int-mem.png" style="width:80%;margin:15px"></td>
+</tr>
+<tr>
+    <td><b>Kryo-Serialized (int)</b></td>
+    <td><img src="/img/blog/kryoSer-int-gc.png" style="width:80%;margin:15px"></td>
+    <td><img src="/img/blog/kryoSer-int-mem.png" style="width:80%;margin:15px"></td>
+</tr>
+</table>
+
+<p>The experiments run single-threaded on an 8-core machine, so full utilization of one core only corresponds to a 12.5% overall utilization. The screenshots show that operating on binary data significantly reduces garbage collection activity. For the object-on-heap approach, the garbage collector runs in very short intervals while filling the sort buffer and causes a lot of CPU usage even for a single processing thread (sorting itself does not trigger the garbage collector). The JVM garbage collects with multiple parallel threads, explaining the high overall CPU utilization. On the other hand, the methods that operate on serialized data rarely trigger the garbage collector and have a much lower CPU utilization. In fact the garbage collector does not run at all if the tuples are sorted on the Integer field using the flink-serialized method because no objects need to be deserialized for pair-wise comparisons. The kryo-serialized method requires slightly more garbage collection since 
 it does not use binary sort keys and deserializes two objects for each comparison.</p>
+
+<p>The memory usage charts shows that the flink-serialized and kryo-serialized constantly occupy a high amount of memory (plus some objects for operation). This is due to the pre-allocation of MemorySegments. The actual memory usage is much lower, because the sort buffers are not completely filled. The following table shows the memory consumption of each method. 10 million records result in about 280 MB of binary data (object data plus pointers and sort keys) depending on the used serializer and presence and size of a binary sort key. Comparing this to the memory requirements of the object-on-heap approach we see that operating on binary data can significantly improve memory efficiency. In our benchmark more than twice as much data can be sorted in-memory if serialized into a sort buffer instead of holding it as objects on the heap.</p>
+
+<p><table width="100%">
+<tr><td><b>Occupied Memory</b></td>
+    <td><b>Object-on-Heap</b></td>
+    <td><b>Flink-Serialized</b></td>
+    <td><b>Kryo-Serialized</b></td>
+</tr>
+<tr>
+    <td><b>Sort on Integer</b></td>
+    <td>approx. 700 MB (heap)</td>
+    <td>277 MB (sort buffer)</td>
+    <td>266 MB (sort buffer)</td>
+</tr>
+<tr>
+    <td><b>Sort on String</b></td>
+    <td>approx. 700 MB (heap)</td>
+    <td>315 MB (sort buffer)</td>
+    <td>266 MB (sort buffer)</td>
+</tr>
+</table><br></p>
+
+<p>To summarize, the experiments verify the previously stated benefits of operating on binary data. </p>
+
+<h3 id="we’re-not-done-yet!">We’re not done yet!</h3>
+
+<p>Apache Flink features quite a bit of advanced techniques to safely and efficiently process huge amounts of data with limited memory resources. However, there are a few points that could make Flink even more efficient. The Flink community is working on moving the managed memory to off-heap memory. This will allow for smaller JVMs, lower garbage collection overhead, and also easier system configuration. With Flink’s Table API, the semantics of all operations such as aggregations and projections are known (in contrast to black-box user-defined functions). Hence we can generate code for Table API operations that directly operates on binary data. Further improvements include serialization layouts which are tailored towards the operations that are applied on the binary data and code generation for serializers and comparators. </p>
+
+<p>The groundwork (and a lot more) for operating on binary data is done but there is still some room for making Flink even better and faster. If you are crazy about performance and like to juggle with lot of bits and bytes, join the Flink community! </p>
+
+<h3 id="tl;dr;-give-me-three-things-to-remember!">TL;DR; Give me three things to remember!</h3>
+
+<ul>
+<li>Flink’s active memory management avoids nasty <code>OutOfMemoryErrors</code> that kill your JVMs and reduces garbage collection overhead.</li>
+<li>Flink features a highly efficient data de/serialization stack that facilitates operations on binary data and makes more data fit into memory.</li>
+<li>Flink’s DBMS-style operators operate natively on binary data yielding high performance in-memory and destage gracefully to disk if necessary.</li>
+</ul>
+
+<p><br>
+<small>Written by Fabian Hueske (<a href="https://twitter.com/fhueske">@fhueske</a>).</small></p>
+
+				</div>
+			</article>
+		</div>
+		<div class="col-md-2"></div>
+	</div>
+	<div class="row" style="padding-top:30px">
+		<div class="col-md-2"></div>
+		<div class="col-md-8">
+		    <div id="disqus_thread"></div>
+		    <script type="text/javascript">
+		        /* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */
+		        var disqus_shortname = 'stratosphere-eu'; // required: replace example with your forum shortname
+
+		        /* * * DON'T EDIT BELOW THIS LINE * * */
+		        (function() {
+		            var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
+		            dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
+		            (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
+		        })();
+		    </script>
+		    <noscript>Please enable JavaScript to view the <a href="http://disqus.com/?ref_noscript">comments powered by Disqus.</a></noscript>
+		    <a href="http://disqus.com" class="dsq-brlink">comments powered by <span class="logo-disqus">Disqus</span></a>			    
+		</div>
+		<div class="col-md-2"></div>
+	</div>
+</div>
+
+    </div>
+    <!--<section id="af-upfooter" class="af-section">
+	<div class="container">
+		<p>Apache Flink is an effort undergoing incubation at The Apache
+			Software Foundation (ASF), sponsored by the Apache Incubator PMC.
+			Incubation is required of all newly accepted projects until a further
+			review indicates that the infrastructure, communications, and
+			decision making process have stabilized in a manner consistent with
+			other successful ASF projects. While incubation status is not
+			necessarily a reflection of the completeness or stability of the
+			code, it does indicate that the project has yet to be fully endorsed
+			by the ASF.</p>
+		<a href="http://incubator.apache.org"> <img class="img-responsive"
+			src="/img/main/apache-incubator-logo.png" alt="Apache Flink" />
+		</a>
+		<p class="text-center">
+			<a href="/privacy-policy.html" title="Privacy Policy"
+				class="af-privacy-policy">Privacy Policy</a>
+		</p>
+	</div>
+</section>-->
+
+<footer id="af-footer">
+	<div class="container">
+		<div class="row">
+			<div class="col-md-3">
+				<h3>Documentation</h3>
+				<ul class="af-footer-menu">
+
+					<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/">0.8.1</a></li>
+					<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/api/java/">0.8.1 Javadocs</a></li>
+					<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-0.8/api/scala/index.html#org.apache.flink.api.scala.package">0.8.1 Scaladocs</a></li>
+				</ul>
+			</div>
+			<div class="col-md-3">
+				<h3>Community</h3>
+				<ul class="af-footer-menu">
+					<li><a href="/community.html#mailing-lists">Mailing Lists</a></li>
+					<li><a href="https://issues.apache.org/jira/browse/FLINK"
+						target="blank">Issues <span
+							class="glyphicon glyphicon-new-window"></span></a></li>
+					<li><a href="/community.html#team">Team</a></li>
+					<li><a href="/how-to-contribute.html">How to contribute</a></li>
+					<li><a href="/coding_guidelines.html">Coding Guidelines</a></li>
+				</ul>
+			</div>
+			<div class="col-md-3">
+				<h3>ASF</h3>
+				<ul class="af-footer-menu">
+					<li><a href="http://www.apache.org/" target="blank">Apache
+							Software foundation <span class="glyphicon glyphicon-new-window"></span>
+					</a></li>
+					<li><a
+						href="http://www.apache.org/foundation/how-it-works.html"
+						target="blank">How it works <span
+							class="glyphicon glyphicon-new-window"></span></a></li>
+					<li><a href="http://www.apache.org/foundation/thanks.html"
+						target="blank">Thanks <span
+							class="glyphicon glyphicon-new-window"></span></a></li>
+					<li><a
+						href="http://www.apache.org/foundation/sponsorship.html"
+						target="blank">Become a sponsor <span
+							class="glyphicon glyphicon-new-window"></span></a></li>
+					<li><a href="http://incubator.apache.org/projects/flink.html"
+						target="blank">Incubation status page <span
+							class="glyphicon glyphicon-new-window"></span></a></li>
+				</ul>
+			</div>
+			<div class="col-md-3">
+				<h3>Project</h3>
+				<ul class="af-footer-menu">
+					<li><a href="/material.html" target="blank">Material <span
+							class="glyphicon glyphicon-new-window"></span></a></li>
+					<li><a
+						href="https://cwiki.apache.org/confluence/display/FLINK"
+						target="blank">Wiki <span
+							class="glyphicon glyphicon-new-window"></span></a></li>
+					<li><a
+						href="https://wiki.apache.org/incubator/StratosphereProposal"
+						target="blank">Incubator proposal <span
+							class="glyphicon glyphicon-new-window"></span></a></li>
+					<li><a href="http://www.apache.org/licenses/LICENSE-2.0"
+						target="blank">License <span
+							class="glyphicon glyphicon-new-window"></span></a></li>
+					<li><a href="https://github.com/apache/incubator-flink"
+						target="blank">Source code <span
+							class="glyphicon glyphicon-new-window"></span></a></li>
+				</ul>
+			</div>
+		</div>
+	</div>
+	<div class="af-footer-bar">
+		<div class="container">
+		  <p>Copyright &copy 2014-2015, <a href="http://www.apache.org">The Apache Software Foundation</a>. All Rights Reserved. Apache and the Apache feather logo are trademarks of the Apache Software Foundation.
+                  </p>
+                  <div>
+                    <div style="float:left">
+                      <p>
+                        <a href="/privacy-policy.html" title="Privacy Policy" class="af-privacy-policy">Privacy Policy</a>
+                    </p>
+                    </div>
+                    <div style="float:right">
+                    <p>
+                      <a href="/blog/feed.xml" class="af-privacy-policy">RSS Feed</a>
+                    </p>
+                    </div>
+                   </div>
+    		</div>
+	</div>
+</footer>
+
+    <!-- Google Analytics -->
+    <script>
+      (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
+      (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
+      m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
+      })(window,document,'script','//www.google-analytics.com/analytics.js','ga');
+
+      ga('create', 'UA-52545728-1', 'auto');
+      ga('send', 'pageview');
+    </script>
+    <script src="/js/main/jquery.mobile.events.min.js"></script>
+    <script src="/js/main/main.js"></script>
+  </body>
+</html>

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/img/blog/data-serialization.png
----------------------------------------------------------------------
diff --git a/img/blog/data-serialization.png b/img/blog/data-serialization.png
new file mode 100644
index 0000000..80667f6
Binary files /dev/null and b/img/blog/data-serialization.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/img/blog/flinkSer-int-gc.png
----------------------------------------------------------------------
diff --git a/img/blog/flinkSer-int-gc.png b/img/blog/flinkSer-int-gc.png
new file mode 100644
index 0000000..29ec5a3
Binary files /dev/null and b/img/blog/flinkSer-int-gc.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/img/blog/flinkSer-int-mem.png
----------------------------------------------------------------------
diff --git a/img/blog/flinkSer-int-mem.png b/img/blog/flinkSer-int-mem.png
new file mode 100644
index 0000000..23750e1
Binary files /dev/null and b/img/blog/flinkSer-int-mem.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/img/blog/kryoSer-int-gc.png
----------------------------------------------------------------------
diff --git a/img/blog/kryoSer-int-gc.png b/img/blog/kryoSer-int-gc.png
new file mode 100644
index 0000000..4883d12
Binary files /dev/null and b/img/blog/kryoSer-int-gc.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/img/blog/kryoSer-int-mem.png
----------------------------------------------------------------------
diff --git a/img/blog/kryoSer-int-mem.png b/img/blog/kryoSer-int-mem.png
new file mode 100644
index 0000000..0ab4483
Binary files /dev/null and b/img/blog/kryoSer-int-mem.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/img/blog/memory-alloc.png
----------------------------------------------------------------------
diff --git a/img/blog/memory-alloc.png b/img/blog/memory-alloc.png
new file mode 100644
index 0000000..2e8d17b
Binary files /dev/null and b/img/blog/memory-alloc.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/img/blog/memory-mgmt.png
----------------------------------------------------------------------
diff --git a/img/blog/memory-mgmt.png b/img/blog/memory-mgmt.png
new file mode 100644
index 0000000..72e7602
Binary files /dev/null and b/img/blog/memory-mgmt.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/img/blog/objHeap-int-gc.png
----------------------------------------------------------------------
diff --git a/img/blog/objHeap-int-gc.png b/img/blog/objHeap-int-gc.png
new file mode 100644
index 0000000..6fca8df
Binary files /dev/null and b/img/blog/objHeap-int-gc.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/img/blog/objHeap-int-mem.png
----------------------------------------------------------------------
diff --git a/img/blog/objHeap-int-mem.png b/img/blog/objHeap-int-mem.png
new file mode 100644
index 0000000..a43e772
Binary files /dev/null and b/img/blog/objHeap-int-mem.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/img/blog/sort-benchmark.png
----------------------------------------------------------------------
diff --git a/img/blog/sort-benchmark.png b/img/blog/sort-benchmark.png
new file mode 100644
index 0000000..1fb796d
Binary files /dev/null and b/img/blog/sort-benchmark.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/img/blog/sorting-binary-data-1.png
----------------------------------------------------------------------
diff --git a/img/blog/sorting-binary-data-1.png b/img/blog/sorting-binary-data-1.png
new file mode 100644
index 0000000..814a76f
Binary files /dev/null and b/img/blog/sorting-binary-data-1.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/img/blog/sorting-binary-data-2.png
----------------------------------------------------------------------
diff --git a/img/blog/sorting-binary-data-2.png b/img/blog/sorting-binary-data-2.png
new file mode 100644
index 0000000..821c0da
Binary files /dev/null and b/img/blog/sorting-binary-data-2.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/img/blog/sorting-binary-data-3.png
----------------------------------------------------------------------
diff --git a/img/blog/sorting-binary-data-3.png b/img/blog/sorting-binary-data-3.png
new file mode 100644
index 0000000..e682e06
Binary files /dev/null and b/img/blog/sorting-binary-data-3.png differ


[2/3] flink-web git commit: Added blog post about how Flink operates on binary data

Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/blog/index.html
----------------------------------------------------------------------
diff --git a/content/blog/index.html b/content/blog/index.html
index 107d6da..2d39bde 100644
--- a/content/blog/index.html
+++ b/content/blog/index.html
@@ -134,6 +134,196 @@
 		<div class="col-md-8">
 			
 			<article>
+				<h2><a href="/news/2015/05/11/Juggling-with-Bits-and-Bytes.html">Juggling with Bits and Bytes</a></h2>
+				<p class="meta">11 May 2015</p>
+
+				<div><h3 id="how-apache-flink-operates-on-binary-data">How Apache Flink operates on binary data</h3>
+
+<p>Nowadays, a lot of open-source systems for analyzing large data sets are implemented in Java or other JVM-based programming languages. The most well-known example is Apache Hadoop, but also newer frameworks such as Apache Spark, Apache Drill, and also Apache Flink run on JVMs. A common challenge that JVM-based data analysis engines face is to store large amounts of data in memory - both for caching and for efficient processing such as sorting and joining of data. Managing the JVM memory well makes the difference between a system that is hard to configure and has unpredictable reliability and performance and a system that behaves robustly with few configuration knobs.</p>
+
+<p>In this blog post we discuss how Apache Flink manages memory, talk about its custom data de/serialization stack, and show how it operates on binary data.</p>
+
+<h3 id="data-objects?-let’s-put-them-on-the-heap!">Data Objects? Let’s put them on the heap!</h3>
+
+<p>The most straight-forward approach to process lots of data in a JVM is to put it as objects on the heap and operate on these objects. Caching a data set as objects would be as simple as maintaining a list containing an object for each record. An in-memory sort would simply sort the list of objects.
+However, this approach has a few notable drawbacks. First of all it is not trivial to watch and control heap memory usage when a lot of objects are created and invalidated constantly. Memory overallocation instantly kills the JVM with an <code>OutOfMemoryError</code>. Another aspect is garbage collection on multi-GB JVMs which are flooded with new objects. The overhead of garbage collection in such environments can easily reach 50% and more. Finally, Java objects come with a certain space overhead depending on the JVM and platform. For data sets with many small objects this can significantly reduce the effectively usable amount of memory. Given proficient system design and careful, use-case specific system parameter tuning, heap memory usage can be more or less controlled and <code>OutOfMemoryErrors</code> avoided. However, such setups are rather fragile especially if data characteristics or the execution environment change.</p>
+
+<h3 id="what-is-flink-doing-about-that?">What is Flink doing about that?</h3>
+
+<p>Apache Flink has its roots at a research project which aimed to combine the best technologies of MapReduce-based systems and parallel database systems. Coming from this background, Flink has always had its own way of processing data in-memory. Instead of putting lots of objects on the heap, Flink serializes objects into a fixed number of pre-allocated memory segments. Its DBMS-style sort and join algorithms operate as much as possible on this binary data to keep the de/serialization overhead at a minimum. If more data needs to be processed than can be kept in memory, Flink’s operators partially spill data to disk. In fact, a lot of Flink’s internal implementations look more like C/C++ rather than common Java. The following figure gives a high-level overview of how Flink stores data serialized in memory segments and spills to disk if necessary.</p>
+
+<p><center>
+<img src="/img/blog/memory-mgmt.png" style="width:90%;margin:15px">
+</center></p>
+
+<p>Flink’s style of active memory management and operating on binary data has several benefits: </p>
+
+<ol>
+<li><strong>Memory-safe execution &amp; efficient out-of-core algorithms.</strong> Due to the fixed amount of allocated memory segments, it is trivial to monitor remaining memory resources. In case of memory shortage, processing operators can efficiently write larger batches of memory segments to disk and later them read back. Consequently, <code>OutOfMemoryErrors</code> are effectively prevented.</li>
+<li><strong>Reduced garbage collection pressure.</strong> Because all long-lived data is in binary representation in Flink&#39;s managed memory, all data objects are short-lived or even mutable and can be reused. Short-lived objects can be more efficiently garbage-collected, which significantly reduces garbage collection pressure. Right now, the pre-allocated memory segments are long-lived objects on the JVM heap, but the Flink community is actively working on allocating off-heap memory for this purpose. This effort will result in much smaller JVM heaps and facilitate even faster garbage collection cycles.</li>
+<li><strong>Space efficient data representation.</strong> Java objects have a storage overhead which can be avoided if the data is stored in a binary representation.</li>
+<li><strong>Efficient binary operations &amp; cache sensitivity.</strong> Binary data can be efficiently compared and operated on given a suitable binary representation. Furthermore, the binary representations can put related values, as well as hash codes, keys, and pointers, adjacently into memory. This gives data structures with usually more cache efficient access patterns.</li>
+</ol>
+
+<p>These properties of active memory management are very desirable in a data processing systems for large-scale data analytics but have a significant price tag attached. Active memory management and operating on binary data is not trivial to implement, i.e., using <code>java.util.HashMap</code> is much easier than implementing a spillable hash-table backed by byte arrays and a custom serialization stack. Of course Apache Flink is not the only JVM-based data processing system that operates on serialized binary data. Projects such as <a href="http://drill.apache.org/">Apache Drill</a>, <a href="http://ignite.incubator.apache.org/">Apache Ignite (incubating)</a> or <a href="http://projectgeode.org/">Apache Geode (incubating)</a> apply similar techniques and it was recently announced that also <a href="http://spark.apache.org/">Apache Spark</a> will evolve into this direction with <a href="https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html">
 Project Tungsten</a>. </p>
+
+<p>In the following we discuss in detail how Flink allocates memory, de/serializes objects, and operates on binary data. We will also show some performance numbers comparing processing objects on the heap and operating on binary data.</p>
+
+<h3 id="how-does-flink-allocate-memory?">How does Flink allocate memory?</h3>
+
+<p>A Flink worker, called TaskManager, is composed of several internal components such as an actor system for coordination with the Flink master, an IOManager that takes care of spilling data to disk and reading it back, and a MemoryManager that coordinates memory usage. In the context of this blog post, the MemoryManager is of most interest. </p>
+
+<p>The MemoryManager takes care of allocating, accounting, and distributing MemorySegments to data processing operators such as sort and join operators. A <a href="https://github.com/apache/flink/blob/release-0.9.0-milestone-1/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java">MemorySegment</a> is Flink’s distribution unit of memory and is backed by a regular Java byte array (size is 32 KB by default). A MemorySegment provides very efficient write and read access to its backed byte array using Java’s unsafe methods. You can think of a MemorySegment as a custom-tailored version of Java’s NIO ByteBuffer. In order to operate on multiple MemorySegments like on a larger chunk of consecutive memory, Flink uses logical views that implement Java’s <code>java.io.DataOutput</code> and <code>java.io.DataInput</code> interfaces.</p>
+
+<p>MemorySegments are allocated once at TaskManager start-up time and are destroyed when the TaskManager is shut down. Hence, they are reused and not garbage-collected over the whole lifetime of a TaskManager. After all internal data structures of a TaskManager have been initialized and all core services have been started, the MemoryManager starts creating MemorySegments. By default 70% of the JVM heap that is available after service initialization is allocated by the MemoryManager. It is also possible to configure an absolute amount of managed memory. The remaining JVM heap is used for objects that are instantiated during task processing, including objects created by user-defined functions. The following figure shows the memory distribution in the TaskManager JVM after startup.</p>
+
+<p><center>
+<img src="/img/blog/memory-alloc.png" style="width:60%;margin:15px">
+</center></p>
+
+<h3 id="how-does-flink-serialize-objects?">How does Flink serialize objects?</h3>
+
+<p>The Java ecosystem offers several libraries to convert objects into a binary representation and back. Common alternatives are standard Java serialization, <a href="https://github.com/EsotericSoftware/kryo">Kryo</a>, <a href="http://avro.apache.org/">Apache Avro</a>, <a href="http://thrift.apache.org/">Apache Thrift</a>, or Google’s <a href="https://github.com/google/protobuf">Protobuf</a>. Flink includes its own custom serialization framework in order to control the binary representation of data. This is important because operating on binary data such as comparing or even manipulating binary data requires exact knowledge of the serialization layout. Further, configuring the serialization layout with respect to operations that are performed on binary data can yield a significant performance boost. Flink’s serialization stack also leverages the fact, that the type of the objects which are going through de/serialization are exactly known before a program is executed. </p>
+
+<p>Flink programs can process data represented as arbitrary Java or Scala objects. Before a program is optimized, the data types at each processing step of the program’s data flow need to be identified. For Java programs, Flink features a reflection-based type extraction component to analyze the return types of user-defined functions. Scala programs are analyzed with help of the Scala compiler. Flink represents each data type with a <a href="https://github.com/apache/flink/blob/release-0.9.0-milestone-1/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java">TypeInformation</a>. Flink has TypeInformations for several kinds of data types, including:</p>
+
+<ul>
+<li>BasicTypeInfo: Any (boxed) Java primitive type or java.lang.String.</li>
+<li>BasicArrayTypeInfo: Any array of a (boxed) Java primitive type or java.lang.String.</li>
+<li>WritableTypeInfo: Any implementation of Hadoop’s Writable interface.</li>
+<li>TupleTypeInfo: Any Flink tuple (Tuple1 to Tuple25). Flink tuples are Java representations for fixed-length tuples with typed fields.</li>
+<li>CaseClassTypeInfo: Any Scala CaseClass (including Scala tuples).</li>
+<li>PojoTypeInfo: Any POJO (Java or Scala), i.e., an object with all fields either being public or accessible through getters and setter that follow the common naming conventions. </li>
+<li>GenericTypeInfo: Any data type that cannot be identified as another type.</li>
+</ul>
+
+<p>Each TypeInformation provides a serializer for the data type it represents. For example, a BasicTypeInfo returns a serializer that writes the respective primitive type, the serializer of a WritableTypeInfo delegates de/serialization to the write() and readFields() methods of the object implementing Hadoop’s Writable interface, and a GenericTypeInfo returns a serializer that delegates serialization to Kryo. Object serialization to a DataOutput which is backed by Flink MemorySegments goes automatically through Java’s efficient unsafe operations. For data types that can be used as keys, i.e., compared and hashed, the TypeInformation provides TypeComparators. TypeComparators compare and hash objects and can - depending on the concrete data type - also efficiently compare binary representations and extract fixed-length binary key prefixes. </p>
+
+<p>Tuple, Pojo, and CaseClass types are composite types, i.e., containers for one or more possibly nested data types. As such, their serializers and comparators are also composite and delegate the serialization and comparison of their member data types to the respective serializers and comparators. The following figure illustrates the serialization of a (nested) <code>Tuple3&lt;Integer, Double, Person&gt;</code> object where <code>Person</code> is a POJO and defined as follows:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">Person</span> <span class="o">{</span>
+    <span class="kd">public</span> <span class="kt">int</span> <span class="n">id</span><span class="o">;</span>
+    <span class="kd">public</span> <span class="n">String</span> <span class="n">name</span><span class="o">;</span>
+<span class="o">}</span>
+</code></pre></div>
+<p><center>
+<img src="/img/blog/data-serialization.png" style="width:80%;margin:15px">
+</center></p>
+
+<p>Flink’s type system can be easily extended by providing custom TypeInformations, Serializers, and Comparators to improve the performance of serializing and comparing custom data types. </p>
+
+<h3 id="how-does-flink-operate-on-binary-data?">How does Flink operate on binary data?</h3>
+
+<p>Similar to many other data processing APIs (including SQL), Flink’s APIs provide transformations to group, sort, and join data sets. These transformations operate on potentially very large data sets. Relational database systems feature very efficient algorithms for these purposes since several decades including external merge-sort, merge-join, and hybrid hash-join. Flink builds on this technology, but generalizes it to handle arbitrary objects using its custom serialization and comparison stack. In the following, we show how Flink operates with binary data by the example of Flink’s in-memory sort algorithm.</p>
+
+<p>Flink assigns a memory budget to its data processing operators. Upon initialization, a sort algorithm requests its memory budget from the MemoryManager and receives a corresponding set of MemorySegments. The set of MemorySegments becomes the memory pool of a so-called sort buffer which collects the data that is be sorted. The following figure illustrates how data objects are serialized into the sort buffer. </p>
+
+<p><center>
+<img src="/img/blog/sorting-binary-data-1.png" style="width:90%;margin:15px">
+</center></p>
+
+<p>The sort buffer is internally organized into two memory regions. The first region holds the full binary data of all objects. The second region contains pointers to the full binary object data and - depending on the key data type - fixed-length sort keys. When an object is added to the sort buffer, its binary data is appended to the first region, and a pointer (and possibly a key) is appended to the second region. The separation of actual data and pointers plus fixed-length keys is done for two purposes. It enables efficient swapping of fix-length entries (key+pointer) and also reduces the data that needs to be moved when sorting. If the sort key is a variable length data type such as a String, the fixed-length sort key must be a prefix key such as the first n characters of a String. Note, not all data types provide a fixed-length (prefix) sort key. When serializing objects into the sort buffer, both memory regions are extended with MemorySegments from the memory pool. Once the me
 mory pool is empty and no more objects can be added, the sort buffer is completely filled and can be sorted. Flink’s sort buffer provides methods to compare and swap elements. This makes the actual sort algorithm pluggable. By default, Flink uses a Quicksort implementation which can fall back to HeapSort. 
+The following figure shows how two objects are compared.</p>
+
+<p><center>
+<img src="/img/blog/sorting-binary-data-2.png" style="width:80%;margin:15px">
+</center></p>
+
+<p>The sort buffer compares two elements by comparing their binary fix-length sort keys. The comparison is successful if either done on a full key (not a prefix key) or if the binary prefix keys are not equal. If the prefix keys are equal (or the sort key data type does not provide a binary prefix key), the sort buffer follows the pointers to the actual object data, deserializes both objects and compares the objects. Depending on the result of the comparison, the sort algorithm decides whether to swap the compared elements or not. The sort buffer swaps two elements by moving their fix-length keys and pointers. The actual data is not moved. Once the sort algorithm finishes, the pointers in the sort buffer are correctly ordered. The following figure shows how the sorted data is returned from the sort buffer. </p>
+
+<p><center>
+<img src="/img/blog/sorting-binary-data-3.png" style="width:80%;margin:15px">
+</center></p>
+
+<p>The sorted data is returned by sequentially reading the pointer region of the sort buffer, skipping the sort keys and following the sorted pointers to the actual data. This data is either deserialized and returned as objects or the binary representation is copied and written to disk in case of an external merge-sort (see this <a href="http://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html">blog post on joins in Flink</a>).</p>
+
+<h3 id="show-me-numbers!">Show me numbers!</h3>
+
+<p>So, what does operating on binary data mean for performance? We’ll run a benchmark that sorts 10 million <code>Tuple2&lt;Integer, String&gt;</code> objects to find out. The values of the Integer field are sampled from a uniform distribution. The String field values have a length of 12 characters and are sampled from a long-tail distribution. The input data is provided by an iterator that returns a mutable object, i.e., the same tuple object instance is returned with different field values. Flink uses this technique when reading data from memory, network, or disk to avoid unnecessary object instantiations. The benchmarks are run in a JVM with 900 MB heap size which is approximately the required amount of memory to store and sort 10 million tuple objects on the heap without dying of an <code>OutOfMemoryError</code>. We sort the tuples on the Integer field and on the String field using three sorting methods:</p>
+
+<ol>
+<li><strong>Object-on-heap.</strong> The tuples are stored in a regular <code>java.util.ArrayList</code> with initial capacity set to 10 million entries and sorted using Java’s regular collection sort.</li>
+<li><strong>Flink-serialized.</strong> The tuple fields are serialized into a sort buffer of 600 MB size using Flink’s custom serializers, sorted as described above, and finally deserialized again. When sorting on the Integer field, the full Integer is used as sort key such that the sort happens entirely on binary data (no deserialization of objects required). For sorting on the String field a 8-byte prefix key is used and tuple objects are deserialized if the prefix keys are equal.</li>
+<li><strong>Kryo-serialized.</strong> The tuple fields are serialized into a sort buffer of 600 MB size using Kryo serialization and sorted without binary sort keys. This means that each pair-wise comparison requires two object to be deserialized.</li>
+</ol>
+
+<p>All sort methods are implemented using a single thread. The reported times are averaged over ten runs. After each run, we call <code>System.gc()</code> to request a garbage collection run which does not go into measured execution time. The following figure shows the time to store the input data in memory, sort it, and read it back as objects. </p>
+
+<p><center>
+<img src="/img/blog/sort-benchmark.png" style="width:90%;margin:15px">
+</center></p>
+
+<p>We see that Flink’s sort on binary data using its own serializers significantly outperforms the other two methods. Comparing to the object-on-heap method, we see that loading the data into memory is much faster. Since we actually collect the objects, there is no opportunity to reuse the object instances, but have to re-create every tuple. This is less efficient than Flink’s serializers (or Kryo serialization). On the other hand, reading objects from the heap comes for free compared to deserialization. In our benchmark, object cloning was more expensive than serialization and deserialization combined. Looking at the sorting time, we see that also sorting on the binary representation is faster than Java’s collection sort. Sorting data that was serialized using Kryo without binary sort key, is much slower than both other methods. This is due to the heavy deserialization overhead. Sorting the tuples on their String field is faster than sorting on the Integer field due to the lo
 ng-tailed value distribution which significantly reduces the number of pair-wise comparisons. To get a better feeling of what is happening during sorting we monitored the executing JVM using VisualVM. The following screenshots show heap memory usage, garbage collection activity and CPU usage over the execution of 10 runs.</p>
+
+<table>
+<tr>
+    <td>&nbsp;</td>
+    <th><center><b>Garbage Collection</b></center></td>
+    <th><center><b>Memory Usage</b></center></td>
+</tr>
+<tr>
+    <td><b>Object-on-Heap (int)</b></td>
+    <td><img src="/img/blog/objHeap-int-gc.png" style="width:80%;margin:15px"></td>
+    <td><img src="/img/blog/objHeap-int-mem.png" style="width:80%;margin:15px"></td>
+</tr>
+<tr>
+    <td><b>Flink-Serialized (int)</b></td>
+    <td><img src="/img/blog/flinkSer-int-gc.png" style="width:80%;margin:15px"></td>
+    <td><img src="/img/blog/flinkSer-int-mem.png" style="width:80%;margin:15px"></td>
+</tr>
+<tr>
+    <td><b>Kryo-Serialized (int)</b></td>
+    <td><img src="/img/blog/kryoSer-int-gc.png" style="width:80%;margin:15px"></td>
+    <td><img src="/img/blog/kryoSer-int-mem.png" style="width:80%;margin:15px"></td>
+</tr>
+</table>
+
+<p>The experiments run single-threaded on an 8-core machine, so full utilization of one core only corresponds to a 12.5% overall utilization. The screenshots show that operating on binary data significantly reduces garbage collection activity. For the object-on-heap approach, the garbage collector runs in very short intervals while filling the sort buffer and causes a lot of CPU usage even for a single processing thread (sorting itself does not trigger the garbage collector). The JVM garbage collects with multiple parallel threads, explaining the high overall CPU utilization. On the other hand, the methods that operate on serialized data rarely trigger the garbage collector and have a much lower CPU utilization. In fact the garbage collector does not run at all if the tuples are sorted on the Integer field using the flink-serialized method because no objects need to be deserialized for pair-wise comparisons. The kryo-serialized method requires slightly more garbage collection since 
 it does not use binary sort keys and deserializes two objects for each comparison.</p>
+
+<p>The memory usage charts shows that the flink-serialized and kryo-serialized constantly occupy a high amount of memory (plus some objects for operation). This is due to the pre-allocation of MemorySegments. The actual memory usage is much lower, because the sort buffers are not completely filled. The following table shows the memory consumption of each method. 10 million records result in about 280 MB of binary data (object data plus pointers and sort keys) depending on the used serializer and presence and size of a binary sort key. Comparing this to the memory requirements of the object-on-heap approach we see that operating on binary data can significantly improve memory efficiency. In our benchmark more than twice as much data can be sorted in-memory if serialized into a sort buffer instead of holding it as objects on the heap.</p>
+
+<p><table width="100%">
+<tr><td><b>Occupied Memory</b></td>
+    <td><b>Object-on-Heap</b></td>
+    <td><b>Flink-Serialized</b></td>
+    <td><b>Kryo-Serialized</b></td>
+</tr>
+<tr>
+    <td><b>Sort on Integer</b></td>
+    <td>approx. 700 MB (heap)</td>
+    <td>277 MB (sort buffer)</td>
+    <td>266 MB (sort buffer)</td>
+</tr>
+<tr>
+    <td><b>Sort on String</b></td>
+    <td>approx. 700 MB (heap)</td>
+    <td>315 MB (sort buffer)</td>
+    <td>266 MB (sort buffer)</td>
+</tr>
+</table><br></p>
+
+<p>To summarize, the experiments verify the previously stated benefits of operating on binary data. </p>
+
+<h3 id="we’re-not-done-yet!">We’re not done yet!</h3>
+
+<p>Apache Flink features quite a bit of advanced techniques to safely and efficiently process huge amounts of data with limited memory resources. However, there are a few points that could make Flink even more efficient. The Flink community is working on moving the managed memory to off-heap memory. This will allow for smaller JVMs, lower garbage collection overhead, and also easier system configuration. With Flink’s Table API, the semantics of all operations such as aggregations and projections are known (in contrast to black-box user-defined functions). Hence we can generate code for Table API operations that directly operates on binary data. Further improvements include serialization layouts which are tailored towards the operations that are applied on the binary data and code generation for serializers and comparators. </p>
+
+<p>The groundwork (and a lot more) for operating on binary data is done but there is still some room for making Flink even better and faster. If you are crazy about performance and like to juggle with lot of bits and bytes, join the Flink community! </p>
+
+<h3 id="tl;dr;-give-me-three-things-to-remember!">TL;DR; Give me three things to remember!</h3>
+
+<ul>
+<li>Flink’s active memory management avoids nasty <code>OutOfMemoryErrors</code> that kill your JVMs and reduces garbage collection overhead.</li>
+<li>Flink features a highly efficient data de/serialization stack that facilitates operations on binary data and makes more data fit into memory.</li>
+<li>Flink’s DBMS-style operators operate natively on binary data yielding high performance in-memory and destage gracefully to disk if necessary.</li>
+</ul>
+
+<p><br>
+<small>Written by Fabian Hueske (<a href="https://twitter.com/fhueske">@fhueske</a>).</small></p>
+</div>
+				<a href="/news/2015/05/11/Juggling-with-Bits-and-Bytes.html#disqus_thread">Juggling with Bits and Bytes</a>
+			</article>
+			
+			<article>
 				<h2><a href="/news/2015/04/13/release-0.9.0-milestone1.html">Announcing Flink 0.9.0-milestone1 preview release</a></h2>
 				<p class="meta">13 Apr 2015</p>
 
@@ -1663,78 +1853,6 @@ Flink serialization system improved a lot over time and by now surpasses the cap
 				<a href="/news/2014/11/18/hadoop-compatibility.html#disqus_thread">Hadoop Compatibility in Flink</a>
 			</article>
 			
-			<article>
-				<h2><a href="/news/2014/11/04/release-0.7.0.html">Apache Flink 0.7.0 available</a></h2>
-				<p class="meta">04 Nov 2014</p>
-
-				<div><p>We are pleased to announce the availability of Flink 0.7.0. This release includes new user-facing features as well as performance and bug fixes, brings the Scala and Java APIs in sync, and introduces Flink Streaming. A total of 34 people have contributed to this release, a big thanks to all of them!</p>
-
-<p>Download Flink 0.7.0 <a href="http://flink.incubator.apache.org/downloads.html">here</a></p>
-
-<p>See the release changelog <a href="https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&amp;version=12327648">here</a></p>
-
-<h2 id="overview-of-major-new-features">Overview of major new features</h2>
-
-<p><strong>Flink Streaming:</strong> The gem of the 0.7.0 release is undoubtedly Flink Streaming. Available currently in alpha, Flink Streaming provides a Java API on top of Apache Flink that can consume streaming data sources (e.g., from Apache Kafka, Apache Flume, and others) and process them in real time. A dedicated blog post on Flink Streaming and its performance is coming up here soon. You can check out the Streaming programming guide <a href="http://flink.incubator.apache.org/docs/0.7-incubating/streaming_guide.html">here</a>.</p>
-
-<p><strong>New Scala API:</strong> The Scala API has been completely rewritten. The Java and Scala APIs have now the same syntax and transformations and will be kept from now on in sync in every future release. See the new Scala API <a href="http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html">here</a>.</p>
-
-<p><strong>Logical key expressions:</strong> You can now specify grouping and joining keys with logical names for member variables of POJO data types. For example, you can join two data sets as <code>persons.join(cities).where(“zip”).equalTo(“zipcode”)</code>. Read more <a href="http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html#specifying-keys">here</a>.</p>
-
-<p><strong>Hadoop MapReduce compatibility:</strong> You can run unmodified Hadoop Mappers and Reducers (mapred API) in Flink, use all Hadoop data types, and read data with all Hadoop InputFormats.</p>
-
-<p><strong>Collection-based execution backend:</strong> The collection-based execution backend enables you to execute a Flink job as a simple Java collections program, bypassing completely the Flink runtime and optimizer. This feature is extremely useful for prototyping, and embedding Flink jobs in projects in a very lightweight manner.</p>
-
-<p><strong>Record API deprecated:</strong> The (old) Stratosphere Record API has been marked as deprecated and is planned for removal in the 0.9.0 release.</p>
-
-<p><strong>BLOB service:</strong> This release contains a new service to distribute jar files and other binary data among the JobManager, TaskManagers and the client. </p>
-
-<p><strong>Intermediate data sets:</strong> A major rewrite of the system internals introduces intermediate data sets as first class citizens. The internal state machine that tracks the distributed tasks has also been completely rewritten for scalability. While this is not visible as a user-facing feature yet, it is the foundation for several upcoming exciting features.</p>
-
-<p><strong>Note:</strong> Currently, there is limited support for Java 8 lambdas when compiling and running from an IDE. The problem is due to type erasure and whether Java compilers retain type information. We are currently working with the Eclipse and OpenJDK communities to resolve this.</p>
-
-<h2 id="contributors">Contributors</h2>
-
-<ul>
-<li>Tamas Ambrus</li>
-<li>Mariem Ayadi</li>
-<li>Marton Balassi</li>
-<li>Daniel Bali</li>
-<li>Ufuk Celebi</li>
-<li>Hung Chang</li>
-<li>David Eszes</li>
-<li>Stephan Ewen</li>
-<li>Judit Feher</li>
-<li>Gyula Fora</li>
-<li>Gabor Hermann</li>
-<li>Fabian Hueske</li>
-<li>Vasiliki Kalavri</li>
-<li>Kristof Kovacs</li>
-<li>Aljoscha Krettek</li>
-<li>Sebastian Kruse</li>
-<li>Sebastian Kunert</li>
-<li>Matyas Manninger</li>
-<li>Robert Metzger</li>
-<li>Mingliang Qi</li>
-<li>Till Rohrmann</li>
-<li>Henry Saputra</li>
-<li>Chesnay Schelper</li>
-<li>Moritz Schubotz</li>
-<li>Hung Sendoh Chang</li>
-<li>Peter Szabo</li>
-<li>Jonas Traub</li>
-<li>Fabian Tschirschnitz</li>
-<li>Artem Tsikiridis</li>
-<li>Kostas Tzoumas</li>
-<li>Timo Walther</li>
-<li>Daniel Warneke</li>
-<li>Tobias Wiens</li>
-<li>Yingjun Wu</li>
-</ul>
-</div>
-				<a href="/news/2014/11/04/release-0.7.0.html#disqus_thread">Apache Flink 0.7.0 available</a>
-			</article>
-			
 		</div>
 		<div class="col-md-2"></div>
 	</div>

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/blog/page2/index.html
----------------------------------------------------------------------
diff --git a/content/blog/page2/index.html b/content/blog/page2/index.html
index 306b5ef..0b37081 100644
--- a/content/blog/page2/index.html
+++ b/content/blog/page2/index.html
@@ -134,6 +134,78 @@
 		<div class="col-md-8">
 			
 			<article>
+				<h2><a href="/news/2014/11/04/release-0.7.0.html">Apache Flink 0.7.0 available</a></h2>
+				<p class="meta">04 Nov 2014</p>
+
+				<div><p>We are pleased to announce the availability of Flink 0.7.0. This release includes new user-facing features as well as performance and bug fixes, brings the Scala and Java APIs in sync, and introduces Flink Streaming. A total of 34 people have contributed to this release, a big thanks to all of them!</p>
+
+<p>Download Flink 0.7.0 <a href="http://flink.incubator.apache.org/downloads.html">here</a></p>
+
+<p>See the release changelog <a href="https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&amp;version=12327648">here</a></p>
+
+<h2 id="overview-of-major-new-features">Overview of major new features</h2>
+
+<p><strong>Flink Streaming:</strong> The gem of the 0.7.0 release is undoubtedly Flink Streaming. Available currently in alpha, Flink Streaming provides a Java API on top of Apache Flink that can consume streaming data sources (e.g., from Apache Kafka, Apache Flume, and others) and process them in real time. A dedicated blog post on Flink Streaming and its performance is coming up here soon. You can check out the Streaming programming guide <a href="http://flink.incubator.apache.org/docs/0.7-incubating/streaming_guide.html">here</a>.</p>
+
+<p><strong>New Scala API:</strong> The Scala API has been completely rewritten. The Java and Scala APIs have now the same syntax and transformations and will be kept from now on in sync in every future release. See the new Scala API <a href="http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html">here</a>.</p>
+
+<p><strong>Logical key expressions:</strong> You can now specify grouping and joining keys with logical names for member variables of POJO data types. For example, you can join two data sets as <code>persons.join(cities).where(“zip”).equalTo(“zipcode”)</code>. Read more <a href="http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html#specifying-keys">here</a>.</p>
+
+<p><strong>Hadoop MapReduce compatibility:</strong> You can run unmodified Hadoop Mappers and Reducers (mapred API) in Flink, use all Hadoop data types, and read data with all Hadoop InputFormats.</p>
+
+<p><strong>Collection-based execution backend:</strong> The collection-based execution backend enables you to execute a Flink job as a simple Java collections program, bypassing completely the Flink runtime and optimizer. This feature is extremely useful for prototyping, and embedding Flink jobs in projects in a very lightweight manner.</p>
+
+<p><strong>Record API deprecated:</strong> The (old) Stratosphere Record API has been marked as deprecated and is planned for removal in the 0.9.0 release.</p>
+
+<p><strong>BLOB service:</strong> This release contains a new service to distribute jar files and other binary data among the JobManager, TaskManagers and the client. </p>
+
+<p><strong>Intermediate data sets:</strong> A major rewrite of the system internals introduces intermediate data sets as first class citizens. The internal state machine that tracks the distributed tasks has also been completely rewritten for scalability. While this is not visible as a user-facing feature yet, it is the foundation for several upcoming exciting features.</p>
+
+<p><strong>Note:</strong> Currently, there is limited support for Java 8 lambdas when compiling and running from an IDE. The problem is due to type erasure and whether Java compilers retain type information. We are currently working with the Eclipse and OpenJDK communities to resolve this.</p>
+
+<h2 id="contributors">Contributors</h2>
+
+<ul>
+<li>Tamas Ambrus</li>
+<li>Mariem Ayadi</li>
+<li>Marton Balassi</li>
+<li>Daniel Bali</li>
+<li>Ufuk Celebi</li>
+<li>Hung Chang</li>
+<li>David Eszes</li>
+<li>Stephan Ewen</li>
+<li>Judit Feher</li>
+<li>Gyula Fora</li>
+<li>Gabor Hermann</li>
+<li>Fabian Hueske</li>
+<li>Vasiliki Kalavri</li>
+<li>Kristof Kovacs</li>
+<li>Aljoscha Krettek</li>
+<li>Sebastian Kruse</li>
+<li>Sebastian Kunert</li>
+<li>Matyas Manninger</li>
+<li>Robert Metzger</li>
+<li>Mingliang Qi</li>
+<li>Till Rohrmann</li>
+<li>Henry Saputra</li>
+<li>Chesnay Schelper</li>
+<li>Moritz Schubotz</li>
+<li>Hung Sendoh Chang</li>
+<li>Peter Szabo</li>
+<li>Jonas Traub</li>
+<li>Fabian Tschirschnitz</li>
+<li>Artem Tsikiridis</li>
+<li>Kostas Tzoumas</li>
+<li>Timo Walther</li>
+<li>Daniel Warneke</li>
+<li>Tobias Wiens</li>
+<li>Yingjun Wu</li>
+</ul>
+</div>
+				<a href="/news/2014/11/04/release-0.7.0.html#disqus_thread">Apache Flink 0.7.0 available</a>
+			</article>
+			
+			<article>
 				<h2><a href="/news/2014/10/03/upcoming_events.html">Upcoming Events</a></h2>
 				<p class="meta">03 Oct 2014</p>
 
@@ -744,95 +816,6 @@ You can now press the &quot;Run&quot; button and see how Stratosphere executes t
 				<a href="/news/2014/01/26/optimizer_plan_visualization_tool.html#disqus_thread">Optimizer Plan Visualization Tool</a>
 			</article>
 			
-			<article>
-				<h2><a href="/news/2014/01/13/stratosphere-release-0.4.html">Stratosphere 0.4 Released</a></h2>
-				<p class="meta">13 Jan 2014</p>
-
-				<div><p>We are pleased to announce that version 0.4 of the Stratosphere system has been released. </p>
-
-<p>Our team has been working hard during the last few months to create an improved and stable Stratosphere version. The new version comes with many new features, usability and performance improvements in all levels, including a new Scala API for the concise specification of programs, a Pregel-like API, support for Yarn clusters, and major performance improvements. The system features now first-class support for iterative programs and thus covers traditional analytical use cases as well as data mining and graph processing use cases with great performance.</p>
-
-<p>In the course of the transition from v0.2 to v0.4 of the system, we have changed pre-existing APIs based on valuable user feedback. This means that, in the interest of easier programming, we have broken backwards compatibility and existing jobs must be adapted, as described in <a href="/blog/tutorial/2014/01/12/0.4-migration-guide.html">the migration guide</a>.</p>
-
-<p>This article will guide you through the feature list of the new release.</p>
-
-<h3 id="scala-programming-interface">Scala Programming Interface</h3>
-
-<p>The new Stratosphere version comes with a new programming API in Scala that supports very fluent and efficient programs that can be expressed with very few lines of code. The API uses Scala&#39;s native type system (no special boxed data types) and supports grouping and joining on types beyond key/value pairs. We use code analysis and code generation to transform Scala&#39;s data model to the Stratosphere runtime. Stratosphere Scala programs are optimized before execution by Stratosphere&#39;s optimizer just like Stratosphere Java programs.</p>
-
-<p>Learn more about the Scala API at the <a href="/docs/0.4/programming_guides/scala.html">Scala Programming Guide</a></p>
-
-<h3 id="iterations">Iterations</h3>
-
-<p>Stratosphere v0.4 introduces deep support for iterative algorithms, required by a large class of advanced analysis algorithms. In contrast to most other systems, &quot;looping over the data&quot; is done inside the system&#39;s runtime, rather than in the client. Individual iterations (supersteps) can be as fast as sub-second times. Loop-invariant data is automatically cached in memory.</p>
-
-<p>We support a special form of iterations called “delta iterations” that selectively modify only some elements of intermediate solution in each iteration. These are applicable to a variety of applications, e.g., use cases of Apache Giraph. We have observed speedups of 70x when using delta iterations instead of regular iterations.</p>
-
-<p>Read more about the new iteration feature in <a href="/docs/0.4/programming_guides/iterations.html">the documentation</a></p>
-
-<h3 id="hadoop-yarn-support">Hadoop YARN Support</h3>
-
-<p>YARN (Yet Another Resource Negotiator) is the major new feature of the recently announced <a href="http://hadoop.apache.org/docs/r2.2.0/">Hadoop 2.2</a>. It allows to share existing clusters with different runtimes. So you can run MapReduce alongside Storm and others. With the 0.4 release, Stratosphere supports YARN.
-Follow <a href="/docs/0.4/setup/yarn.html">our guide</a> on how to start a Stratosphere YARN session.</p>
-
-<h3 id="improved-scripting-language-meteor">Improved Scripting Language Meteor</h3>
-
-<p>The high-level language Meteor now natively serializes JSON trees for greater performance and offers additional operators and file formats. We greatly empowered the user to write crispier scripts by adding second-order functions, multi-output operators, and other syntactical sugar. For developers of Meteor packages, the API is much more comprehensive and allows to define custom data types that can be easily embedded in JSON trees through ad-hoc byte code generation.</p>
-
-<h3 id="spargel:-pregel-inspired-graph-processing">Spargel: Pregel Inspired Graph Processing</h3>
-
-<p>Spargel is a vertex-centric API similar to the interface proposed in Google&#39;s Pregel paper and implemented in Apache Giraph. Spargel is implemented in 500 lines of code (including comments) on top of Stratosphere&#39;s delta iterations feature. This confirms the flexibility of Stratosphere&#39;s architecture. </p>
-
-<h3 id="web-frontend">Web Frontend</h3>
-
-<p>Using the new web frontend, you can monitor the progress of Stratosphere jobs. For finished jobs, the frontend shows a breakdown of the execution times for each operator. The webclient also visualizes the execution strategies chosen by the optimizer.</p>
-
-<h3 id="accumulators">Accumulators</h3>
-
-<p>Stratosphere&#39;s accumulators allow program developers to compute simple statistics, such as counts, sums, min/max values, or histograms, as a side effect of the processing functions. An example application would be to count the total number of records/tuples processed by a function. Stratosphere will not launch additional tasks (reducers), but will compute the number &quot;on the fly&quot; as a side-product of the functions application to the data. The concept is similar to Hadoop&#39;s counters, but supports more types of aggregation.</p>
-
-<h3 id="refactored-apis">Refactored APIs</h3>
-
-<p>Based on valuable user feedback, we refactored the Java programming interface to make it more intuitive and easier to use. The basic concepts are still the same, however the naming of most interfaces changed and the structure of the code was adapted. When updating to the 0.4 release you will need to adapt your jobs and dependencies. A previous blog post has a guide to the necessary changes to adapt programs to Stratosphere 0.4.</p>
-
-<h3 id="local-debugging">Local Debugging</h3>
-
-<p>You can now test and debug Stratosphere jobs locally. The <a href="/docs/0.4/program_execution/local_executor.html">LocalExecutor</a> allows to execute Stratosphere Jobs from IDE&#39;s. The same code that runs on clusters also runs in a single JVM multi-threaded. The mode supports full debugging capabilities known from regular applications (placing breakpoints and stepping through the program&#39;s functions). An advanced mode supports simulating fully distributed operation locally.</p>
-
-<h3 id="miscellaneous">Miscellaneous</h3>
-
-<ul>
-<li>The configuration of Stratosphere has been changed to YAML</li>
-<li>HBase support</li>
-<li>JDBC Input format</li>
-<li>Improved Windows Compatibility: Batch-files to start Stratosphere on Windows and all unit tests passing on Windows.</li>
-<li>Stratosphere is available in Maven Central and Sonatype Snapshot Repository</li>
-<li>Improved build system that supports different Hadoop versions using Maven profiles</li>
-<li>Maven Archetypes for Stratosphere Jobs.</li>
-<li>Stability and Usability improvements with many bug fixes.</li>
-</ul>
-
-<h3 id="download-and-get-started-with-stratosphere-v0.4">Download and get started with Stratosphere v0.4</h3>
-
-<p>There are several options for getting started with Stratosphere. </p>
-
-<ul>
-<li>Download it on the <a href="/downloads">download page</a></li>
-<li>Start your program with the <a href="/quickstart/">Quick-start guides</a>.</li>
-<li>Complete <a href="/docs/0.4/">documentation and set-up guides</a></li>
-</ul>
-
-<h3 id="tell-us-what-you-think!">Tell us what you think!</h3>
-
-<p>Are you using, or planning to use Stratosphere? Sign up in our <a href="https://groups.google.com/forum/#!forum/stratosphere-dev">mailing list</a> and drop us a line.</p>
-
-<p>Have you found a bug? <a href="https://github.com/stratosphere/stratosphere">Post an issue</a> on GitHub.</p>
-
-<p>Follow us on <a href="https://twitter.com/stratosphere_eu">Twitter</a> and <a href="https://github.com/stratosphere/stratosphere">GitHub</a> to stay in touch with the latest news!</p>
-</div>
-				<a href="/news/2014/01/13/stratosphere-release-0.4.html#disqus_thread">Stratosphere 0.4 Released</a>
-			</article>
-			
 		</div>
 		<div class="col-md-2"></div>
 	</div>

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/blog/page3/index.html
----------------------------------------------------------------------
diff --git a/content/blog/page3/index.html b/content/blog/page3/index.html
index ef3aef1..cfddf5c 100644
--- a/content/blog/page3/index.html
+++ b/content/blog/page3/index.html
@@ -134,6 +134,95 @@
 		<div class="col-md-8">
 			
 			<article>
+				<h2><a href="/news/2014/01/13/stratosphere-release-0.4.html">Stratosphere 0.4 Released</a></h2>
+				<p class="meta">13 Jan 2014</p>
+
+				<div><p>We are pleased to announce that version 0.4 of the Stratosphere system has been released. </p>
+
+<p>Our team has been working hard during the last few months to create an improved and stable Stratosphere version. The new version comes with many new features, usability and performance improvements in all levels, including a new Scala API for the concise specification of programs, a Pregel-like API, support for Yarn clusters, and major performance improvements. The system features now first-class support for iterative programs and thus covers traditional analytical use cases as well as data mining and graph processing use cases with great performance.</p>
+
+<p>In the course of the transition from v0.2 to v0.4 of the system, we have changed pre-existing APIs based on valuable user feedback. This means that, in the interest of easier programming, we have broken backwards compatibility and existing jobs must be adapted, as described in <a href="/blog/tutorial/2014/01/12/0.4-migration-guide.html">the migration guide</a>.</p>
+
+<p>This article will guide you through the feature list of the new release.</p>
+
+<h3 id="scala-programming-interface">Scala Programming Interface</h3>
+
+<p>The new Stratosphere version comes with a new programming API in Scala that supports very fluent and efficient programs that can be expressed with very few lines of code. The API uses Scala&#39;s native type system (no special boxed data types) and supports grouping and joining on types beyond key/value pairs. We use code analysis and code generation to transform Scala&#39;s data model to the Stratosphere runtime. Stratosphere Scala programs are optimized before execution by Stratosphere&#39;s optimizer just like Stratosphere Java programs.</p>
+
+<p>Learn more about the Scala API at the <a href="/docs/0.4/programming_guides/scala.html">Scala Programming Guide</a></p>
+
+<h3 id="iterations">Iterations</h3>
+
+<p>Stratosphere v0.4 introduces deep support for iterative algorithms, required by a large class of advanced analysis algorithms. In contrast to most other systems, &quot;looping over the data&quot; is done inside the system&#39;s runtime, rather than in the client. Individual iterations (supersteps) can be as fast as sub-second times. Loop-invariant data is automatically cached in memory.</p>
+
+<p>We support a special form of iterations called “delta iterations” that selectively modify only some elements of intermediate solution in each iteration. These are applicable to a variety of applications, e.g., use cases of Apache Giraph. We have observed speedups of 70x when using delta iterations instead of regular iterations.</p>
+
+<p>Read more about the new iteration feature in <a href="/docs/0.4/programming_guides/iterations.html">the documentation</a></p>
+
+<h3 id="hadoop-yarn-support">Hadoop YARN Support</h3>
+
+<p>YARN (Yet Another Resource Negotiator) is the major new feature of the recently announced <a href="http://hadoop.apache.org/docs/r2.2.0/">Hadoop 2.2</a>. It allows to share existing clusters with different runtimes. So you can run MapReduce alongside Storm and others. With the 0.4 release, Stratosphere supports YARN.
+Follow <a href="/docs/0.4/setup/yarn.html">our guide</a> on how to start a Stratosphere YARN session.</p>
+
+<h3 id="improved-scripting-language-meteor">Improved Scripting Language Meteor</h3>
+
+<p>The high-level language Meteor now natively serializes JSON trees for greater performance and offers additional operators and file formats. We greatly empowered the user to write crispier scripts by adding second-order functions, multi-output operators, and other syntactical sugar. For developers of Meteor packages, the API is much more comprehensive and allows to define custom data types that can be easily embedded in JSON trees through ad-hoc byte code generation.</p>
+
+<h3 id="spargel:-pregel-inspired-graph-processing">Spargel: Pregel Inspired Graph Processing</h3>
+
+<p>Spargel is a vertex-centric API similar to the interface proposed in Google&#39;s Pregel paper and implemented in Apache Giraph. Spargel is implemented in 500 lines of code (including comments) on top of Stratosphere&#39;s delta iterations feature. This confirms the flexibility of Stratosphere&#39;s architecture. </p>
+
+<h3 id="web-frontend">Web Frontend</h3>
+
+<p>Using the new web frontend, you can monitor the progress of Stratosphere jobs. For finished jobs, the frontend shows a breakdown of the execution times for each operator. The webclient also visualizes the execution strategies chosen by the optimizer.</p>
+
+<h3 id="accumulators">Accumulators</h3>
+
+<p>Stratosphere&#39;s accumulators allow program developers to compute simple statistics, such as counts, sums, min/max values, or histograms, as a side effect of the processing functions. An example application would be to count the total number of records/tuples processed by a function. Stratosphere will not launch additional tasks (reducers), but will compute the number &quot;on the fly&quot; as a side-product of the functions application to the data. The concept is similar to Hadoop&#39;s counters, but supports more types of aggregation.</p>
+
+<h3 id="refactored-apis">Refactored APIs</h3>
+
+<p>Based on valuable user feedback, we refactored the Java programming interface to make it more intuitive and easier to use. The basic concepts are still the same, however the naming of most interfaces changed and the structure of the code was adapted. When updating to the 0.4 release you will need to adapt your jobs and dependencies. A previous blog post has a guide to the necessary changes to adapt programs to Stratosphere 0.4.</p>
+
+<h3 id="local-debugging">Local Debugging</h3>
+
+<p>You can now test and debug Stratosphere jobs locally. The <a href="/docs/0.4/program_execution/local_executor.html">LocalExecutor</a> allows to execute Stratosphere Jobs from IDE&#39;s. The same code that runs on clusters also runs in a single JVM multi-threaded. The mode supports full debugging capabilities known from regular applications (placing breakpoints and stepping through the program&#39;s functions). An advanced mode supports simulating fully distributed operation locally.</p>
+
+<h3 id="miscellaneous">Miscellaneous</h3>
+
+<ul>
+<li>The configuration of Stratosphere has been changed to YAML</li>
+<li>HBase support</li>
+<li>JDBC Input format</li>
+<li>Improved Windows Compatibility: Batch-files to start Stratosphere on Windows and all unit tests passing on Windows.</li>
+<li>Stratosphere is available in Maven Central and Sonatype Snapshot Repository</li>
+<li>Improved build system that supports different Hadoop versions using Maven profiles</li>
+<li>Maven Archetypes for Stratosphere Jobs.</li>
+<li>Stability and Usability improvements with many bug fixes.</li>
+</ul>
+
+<h3 id="download-and-get-started-with-stratosphere-v0.4">Download and get started with Stratosphere v0.4</h3>
+
+<p>There are several options for getting started with Stratosphere. </p>
+
+<ul>
+<li>Download it on the <a href="/downloads">download page</a></li>
+<li>Start your program with the <a href="/quickstart/">Quick-start guides</a>.</li>
+<li>Complete <a href="/docs/0.4/">documentation and set-up guides</a></li>
+</ul>
+
+<h3 id="tell-us-what-you-think!">Tell us what you think!</h3>
+
+<p>Are you using, or planning to use Stratosphere? Sign up in our <a href="https://groups.google.com/forum/#!forum/stratosphere-dev">mailing list</a> and drop us a line.</p>
+
+<p>Have you found a bug? <a href="https://github.com/stratosphere/stratosphere">Post an issue</a> on GitHub.</p>
+
+<p>Follow us on <a href="https://twitter.com/stratosphere_eu">Twitter</a> and <a href="https://github.com/stratosphere/stratosphere">GitHub</a> to stay in touch with the latest news!</p>
+</div>
+				<a href="/news/2014/01/13/stratosphere-release-0.4.html#disqus_thread">Stratosphere 0.4 Released</a>
+			</article>
+			
+			<article>
 				<h2><a href="/news/2014/01/12/0.4-migration-guide.html">Stratosphere Version 0.4 Migration Guide</a></h2>
 				<p class="meta">12 Jan 2014</p>
 

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/downloads.html
----------------------------------------------------------------------
diff --git a/content/downloads.html b/content/downloads.html
index 305da5b..449c121 100644
--- a/content/downloads.html
+++ b/content/downloads.html
@@ -130,16 +130,16 @@
     <div style="padding-top:50px" class="container">
         <h1>Downloads</h1>
 
-<script type="text/javascript">
+<p><script type="text/javascript">
 $( document ).ready(function() {
   // Handler for .ready() called.
-  $('.ga-track').on('click', function() {
+  $(&#39;.ga-track&#39;).on(&#39;click&#39;, function() {
     // we just use the element id for tracking with google analytics
-    ga('send', 'event', 'button', 'click', $(this).attr('id'));
-  });
+    ga(&#39;send&#39;, &#39;event&#39;, &#39;button&#39;, &#39;click&#39;, $(this).attr(&#39;id&#39;));
+  });</p>
 
-});
-</script>
+<p>});
+</script></p>
 
 <p class="lead">Pick the <strong>Apache Flink</strong> package matching your Hadoop version.</p>
 

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/img/blog/data-serialization.png
----------------------------------------------------------------------
diff --git a/content/img/blog/data-serialization.png b/content/img/blog/data-serialization.png
new file mode 100644
index 0000000..80667f6
Binary files /dev/null and b/content/img/blog/data-serialization.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/img/blog/flinkSer-int-gc.png
----------------------------------------------------------------------
diff --git a/content/img/blog/flinkSer-int-gc.png b/content/img/blog/flinkSer-int-gc.png
new file mode 100644
index 0000000..29ec5a3
Binary files /dev/null and b/content/img/blog/flinkSer-int-gc.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/img/blog/flinkSer-int-mem.png
----------------------------------------------------------------------
diff --git a/content/img/blog/flinkSer-int-mem.png b/content/img/blog/flinkSer-int-mem.png
new file mode 100644
index 0000000..23750e1
Binary files /dev/null and b/content/img/blog/flinkSer-int-mem.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/img/blog/kryoSer-int-gc.png
----------------------------------------------------------------------
diff --git a/content/img/blog/kryoSer-int-gc.png b/content/img/blog/kryoSer-int-gc.png
new file mode 100644
index 0000000..4883d12
Binary files /dev/null and b/content/img/blog/kryoSer-int-gc.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/img/blog/kryoSer-int-mem.png
----------------------------------------------------------------------
diff --git a/content/img/blog/kryoSer-int-mem.png b/content/img/blog/kryoSer-int-mem.png
new file mode 100644
index 0000000..0ab4483
Binary files /dev/null and b/content/img/blog/kryoSer-int-mem.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/img/blog/memory-alloc.png
----------------------------------------------------------------------
diff --git a/content/img/blog/memory-alloc.png b/content/img/blog/memory-alloc.png
new file mode 100644
index 0000000..2e8d17b
Binary files /dev/null and b/content/img/blog/memory-alloc.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/img/blog/memory-mgmt.png
----------------------------------------------------------------------
diff --git a/content/img/blog/memory-mgmt.png b/content/img/blog/memory-mgmt.png
new file mode 100644
index 0000000..72e7602
Binary files /dev/null and b/content/img/blog/memory-mgmt.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/img/blog/objHeap-int-gc.png
----------------------------------------------------------------------
diff --git a/content/img/blog/objHeap-int-gc.png b/content/img/blog/objHeap-int-gc.png
new file mode 100644
index 0000000..6fca8df
Binary files /dev/null and b/content/img/blog/objHeap-int-gc.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/img/blog/objHeap-int-mem.png
----------------------------------------------------------------------
diff --git a/content/img/blog/objHeap-int-mem.png b/content/img/blog/objHeap-int-mem.png
new file mode 100644
index 0000000..a43e772
Binary files /dev/null and b/content/img/blog/objHeap-int-mem.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/img/blog/sort-benchmark.png
----------------------------------------------------------------------
diff --git a/content/img/blog/sort-benchmark.png b/content/img/blog/sort-benchmark.png
new file mode 100644
index 0000000..1fb796d
Binary files /dev/null and b/content/img/blog/sort-benchmark.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/img/blog/sorting-binary-data-1.png
----------------------------------------------------------------------
diff --git a/content/img/blog/sorting-binary-data-1.png b/content/img/blog/sorting-binary-data-1.png
new file mode 100644
index 0000000..814a76f
Binary files /dev/null and b/content/img/blog/sorting-binary-data-1.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/img/blog/sorting-binary-data-2.png
----------------------------------------------------------------------
diff --git a/content/img/blog/sorting-binary-data-2.png b/content/img/blog/sorting-binary-data-2.png
new file mode 100644
index 0000000..821c0da
Binary files /dev/null and b/content/img/blog/sorting-binary-data-2.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/img/blog/sorting-binary-data-3.png
----------------------------------------------------------------------
diff --git a/content/img/blog/sorting-binary-data-3.png b/content/img/blog/sorting-binary-data-3.png
new file mode 100644
index 0000000..e682e06
Binary files /dev/null and b/content/img/blog/sorting-binary-data-3.png differ


[3/3] flink-web git commit: Added blog post about how Flink operates on binary data

Posted by fh...@apache.org.
Added blog post about how Flink operates on binary data


Project: http://git-wip-us.apache.org/repos/asf/flink-web/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink-web/commit/7a9b4873
Tree: http://git-wip-us.apache.org/repos/asf/flink-web/tree/7a9b4873
Diff: http://git-wip-us.apache.org/repos/asf/flink-web/diff/7a9b4873

Branch: refs/heads/asf-site
Commit: 7a9b487311d45745958d8c949d10c90a74a97e73
Parents: 49b53df
Author: Fabian Hueske <fh...@gmail.com>
Authored: Tue May 12 11:40:10 2015 +0200
Committer: Fabian Hueske <fh...@gmail.com>
Committed: Tue May 12 11:40:10 2015 +0200

----------------------------------------------------------------------
 .../2015-05-11-Juggling-with-Bits-and-Bytes.md  | 192 ++++++++
 content/archive.html                            |  12 +-
 content/blog/feed.xml                           | 190 ++++++++
 content/blog/index.html                         | 262 +++++++---
 content/blog/page2/index.html                   | 161 +++----
 content/blog/page3/index.html                   |  89 ++++
 content/downloads.html                          |  12 +-
 content/img/blog/data-serialization.png         | Bin 0 -> 395455 bytes
 content/img/blog/flinkSer-int-gc.png            | Bin 0 -> 12405 bytes
 content/img/blog/flinkSer-int-mem.png           | Bin 0 -> 15689 bytes
 content/img/blog/kryoSer-int-gc.png             | Bin 0 -> 12018 bytes
 content/img/blog/kryoSer-int-mem.png            | Bin 0 -> 16747 bytes
 content/img/blog/memory-alloc.png               | Bin 0 -> 299878 bytes
 content/img/blog/memory-mgmt.png                | Bin 0 -> 360975 bytes
 content/img/blog/objHeap-int-gc.png             | Bin 0 -> 22921 bytes
 content/img/blog/objHeap-int-mem.png            | Bin 0 -> 17259 bytes
 content/img/blog/sort-benchmark.png             | Bin 0 -> 73111 bytes
 content/img/blog/sorting-binary-data-1.png      | Bin 0 -> 251023 bytes
 content/img/blog/sorting-binary-data-2.png      | Bin 0 -> 335940 bytes
 content/img/blog/sorting-binary-data-3.png      | Bin 0 -> 278178 bytes
 .../05/11/Juggling-with-Bits-and-Bytes.html     | 473 +++++++++++++++++++
 img/blog/data-serialization.png                 | Bin 0 -> 395455 bytes
 img/blog/flinkSer-int-gc.png                    | Bin 0 -> 12405 bytes
 img/blog/flinkSer-int-mem.png                   | Bin 0 -> 15689 bytes
 img/blog/kryoSer-int-gc.png                     | Bin 0 -> 12018 bytes
 img/blog/kryoSer-int-mem.png                    | Bin 0 -> 16747 bytes
 img/blog/memory-alloc.png                       | Bin 0 -> 299878 bytes
 img/blog/memory-mgmt.png                        | Bin 0 -> 360975 bytes
 img/blog/objHeap-int-gc.png                     | Bin 0 -> 22921 bytes
 img/blog/objHeap-int-mem.png                    | Bin 0 -> 17259 bytes
 img/blog/sort-benchmark.png                     | Bin 0 -> 73111 bytes
 img/blog/sorting-binary-data-1.png              | Bin 0 -> 251023 bytes
 img/blog/sorting-binary-data-2.png              | Bin 0 -> 335940 bytes
 img/blog/sorting-binary-data-3.png              | Bin 0 -> 278178 bytes
 34 files changed, 1218 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/_posts/2015-05-11-Juggling-with-Bits-and-Bytes.md
----------------------------------------------------------------------
diff --git a/_posts/2015-05-11-Juggling-with-Bits-and-Bytes.md b/_posts/2015-05-11-Juggling-with-Bits-and-Bytes.md
new file mode 100644
index 0000000..f0a6f87
--- /dev/null
+++ b/_posts/2015-05-11-Juggling-with-Bits-and-Bytes.md
@@ -0,0 +1,192 @@
+---
+layout: post
+title:  "Juggling with Bits and Bytes"
+date:   2015-05-11 10:00:00
+categories: news
+---
+
+###How Apache Flink operates on binary data
+
+Nowadays, a lot of open-source systems for analyzing large data sets are implemented in Java or other JVM-based programming languages. The most well-known example is Apache Hadoop, but also newer frameworks such as Apache Spark, Apache Drill, and also Apache Flink run on JVMs. A common challenge that JVM-based data analysis engines face is to store large amounts of data in memory - both for caching and for efficient processing such as sorting and joining of data. Managing the JVM memory well makes the difference between a system that is hard to configure and has unpredictable reliability and performance and a system that behaves robustly with few configuration knobs.
+
+In this blog post we discuss how Apache Flink manages memory, talk about its custom data de/serialization stack, and show how it operates on binary data.
+
+###Data Objects? Let’s put them on the heap!
+
+The most straight-forward approach to process lots of data in a JVM is to put it as objects on the heap and operate on these objects. Caching a data set as objects would be as simple as maintaining a list containing an object for each record. An in-memory sort would simply sort the list of objects.
+However, this approach has a few notable drawbacks. First of all it is not trivial to watch and control heap memory usage when a lot of objects are created and invalidated constantly. Memory overallocation instantly kills the JVM with an `OutOfMemoryError`. Another aspect is garbage collection on multi-GB JVMs which are flooded with new objects. The overhead of garbage collection in such environments can easily reach 50% and more. Finally, Java objects come with a certain space overhead depending on the JVM and platform. For data sets with many small objects this can significantly reduce the effectively usable amount of memory. Given proficient system design and careful, use-case specific system parameter tuning, heap memory usage can be more or less controlled and `OutOfMemoryErrors` avoided. However, such setups are rather fragile especially if data characteristics or the execution environment change.
+
+
+###What is Flink doing about that?
+
+Apache Flink has its roots at a research project which aimed to combine the best technologies of MapReduce-based systems and parallel database systems. Coming from this background, Flink has always had its own way of processing data in-memory. Instead of putting lots of objects on the heap, Flink serializes objects into a fixed number of pre-allocated memory segments. Its DBMS-style sort and join algorithms operate as much as possible on this binary data to keep the de/serialization overhead at a minimum. If more data needs to be processed than can be kept in memory, Flink’s operators partially spill data to disk. In fact, a lot of Flink’s internal implementations look more like C/C++ rather than common Java. The following figure gives a high-level overview of how Flink stores data serialized in memory segments and spills to disk if necessary.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/memory-mgmt.png" style="width:90%;margin:15px">
+</center>
+
+Flink’s style of active memory management and operating on binary data has several benefits: 
+
+1. **Memory-safe execution & efficient out-of-core algorithms.** Due to the fixed amount of allocated memory segments, it is trivial to monitor remaining memory resources. In case of memory shortage, processing operators can efficiently write larger batches of memory segments to disk and later them read back. Consequently, `OutOfMemoryErrors` are effectively prevented.
+2. **Reduced garbage collection pressure.** Because all long-lived data is in binary representation in Flink's managed memory, all data objects are short-lived or even mutable and can be reused. Short-lived objects can be more efficiently garbage-collected, which significantly reduces garbage collection pressure. Right now, the pre-allocated memory segments are long-lived objects on the JVM heap, but the Flink community is actively working on allocating off-heap memory for this purpose. This effort will result in much smaller JVM heaps and facilitate even faster garbage collection cycles.
+3. **Space efficient data representation.** Java objects have a storage overhead which can be avoided if the data is stored in a binary representation.
+4. **Efficient binary operations & cache sensitivity.** Binary data can be efficiently compared and operated on given a suitable binary representation. Furthermore, the binary representations can put related values, as well as hash codes, keys, and pointers, adjacently into memory. This gives data structures with usually more cache efficient access patterns.
+
+These properties of active memory management are very desirable in a data processing systems for large-scale data analytics but have a significant price tag attached. Active memory management and operating on binary data is not trivial to implement, i.e., using `java.util.HashMap` is much easier than implementing a spillable hash-table backed by byte arrays and a custom serialization stack. Of course Apache Flink is not the only JVM-based data processing system that operates on serialized binary data. Projects such as [Apache Drill](http://drill.apache.org/), [Apache Ignite (incubating)](http://ignite.incubator.apache.org/) or [Apache Geode (incubating)](http://projectgeode.org/) apply similar techniques and it was recently announced that also [Apache Spark](http://spark.apache.org/) will evolve into this direction with [Project Tungsten](https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html). 
+
+In the following we discuss in detail how Flink allocates memory, de/serializes objects, and operates on binary data. We will also show some performance numbers comparing processing objects on the heap and operating on binary data.
+
+
+###How does Flink allocate memory?
+
+A Flink worker, called TaskManager, is composed of several internal components such as an actor system for coordination with the Flink master, an IOManager that takes care of spilling data to disk and reading it back, and a MemoryManager that coordinates memory usage. In the context of this blog post, the MemoryManager is of most interest. 
+
+The MemoryManager takes care of allocating, accounting, and distributing MemorySegments to data processing operators such as sort and join operators. A [MemorySegment](https://github.com/apache/flink/blob/release-0.9.0-milestone-1/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java) is Flink’s distribution unit of memory and is backed by a regular Java byte array (size is 32 KB by default). A MemorySegment provides very efficient write and read access to its backed byte array using Java’s unsafe methods. You can think of a MemorySegment as a custom-tailored version of Java’s NIO ByteBuffer. In order to operate on multiple MemorySegments like on a larger chunk of consecutive memory, Flink uses logical views that implement Java’s `java.io.DataOutput` and `java.io.DataInput` interfaces.
+
+MemorySegments are allocated once at TaskManager start-up time and are destroyed when the TaskManager is shut down. Hence, they are reused and not garbage-collected over the whole lifetime of a TaskManager. After all internal data structures of a TaskManager have been initialized and all core services have been started, the MemoryManager starts creating MemorySegments. By default 70% of the JVM heap that is available after service initialization is allocated by the MemoryManager. It is also possible to configure an absolute amount of managed memory. The remaining JVM heap is used for objects that are instantiated during task processing, including objects created by user-defined functions. The following figure shows the memory distribution in the TaskManager JVM after startup.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/memory-alloc.png" style="width:60%;margin:15px">
+</center>
+
+###How does Flink serialize objects?
+
+The Java ecosystem offers several libraries to convert objects into a binary representation and back. Common alternatives are standard Java serialization, [Kryo](https://github.com/EsotericSoftware/kryo), [Apache Avro](http://avro.apache.org/), [Apache Thrift](http://thrift.apache.org/), or Google’s [Protobuf](https://github.com/google/protobuf). Flink includes its own custom serialization framework in order to control the binary representation of data. This is important because operating on binary data such as comparing or even manipulating binary data requires exact knowledge of the serialization layout. Further, configuring the serialization layout with respect to operations that are performed on binary data can yield a significant performance boost. Flink’s serialization stack also leverages the fact, that the type of the objects which are going through de/serialization are exactly known before a program is executed. 
+
+Flink programs can process data represented as arbitrary Java or Scala objects. Before a program is optimized, the data types at each processing step of the program’s data flow need to be identified. For Java programs, Flink features a reflection-based type extraction component to analyze the return types of user-defined functions. Scala programs are analyzed with help of the Scala compiler. Flink represents each data type with a [TypeInformation](https://github.com/apache/flink/blob/release-0.9.0-milestone-1/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java). Flink has TypeInformations for several kinds of data types, including:
+
+* BasicTypeInfo: Any (boxed) Java primitive type or java.lang.String.
+* BasicArrayTypeInfo: Any array of a (boxed) Java primitive type or java.lang.String.
+* WritableTypeInfo: Any implementation of Hadoop’s Writable interface.
+* TupleTypeInfo: Any Flink tuple (Tuple1 to Tuple25). Flink tuples are Java representations for fixed-length tuples with typed fields.
+* CaseClassTypeInfo: Any Scala CaseClass (including Scala tuples).
+* PojoTypeInfo: Any POJO (Java or Scala), i.e., an object with all fields either being public or accessible through getters and setter that follow the common naming conventions. 
+* GenericTypeInfo: Any data type that cannot be identified as another type.
+
+Each TypeInformation provides a serializer for the data type it represents. For example, a BasicTypeInfo returns a serializer that writes the respective primitive type, the serializer of a WritableTypeInfo delegates de/serialization to the write() and readFields() methods of the object implementing Hadoop’s Writable interface, and a GenericTypeInfo returns a serializer that delegates serialization to Kryo. Object serialization to a DataOutput which is backed by Flink MemorySegments goes automatically through Java’s efficient unsafe operations. For data types that can be used as keys, i.e., compared and hashed, the TypeInformation provides TypeComparators. TypeComparators compare and hash objects and can - depending on the concrete data type - also efficiently compare binary representations and extract fixed-length binary key prefixes. 
+
+Tuple, Pojo, and CaseClass types are composite types, i.e., containers for one or more possibly nested data types. As such, their serializers and comparators are also composite and delegate the serialization and comparison of their member data types to the respective serializers and comparators. The following figure illustrates the serialization of a (nested) `Tuple3<Integer, Double, Person>` object where `Person` is a POJO and defined as follows:
+
+```java
+public class Person {
+    public int id;
+    public String name;
+}
+```
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/data-serialization.png" style="width:80%;margin:15px">
+</center>
+
+Flink’s type system can be easily extended by providing custom TypeInformations, Serializers, and Comparators to improve the performance of serializing and comparing custom data types. 
+
+
+###How does Flink operate on binary data?
+
+Similar to many other data processing APIs (including SQL), Flink’s APIs provide transformations to group, sort, and join data sets. These transformations operate on potentially very large data sets. Relational database systems feature very efficient algorithms for these purposes since several decades including external merge-sort, merge-join, and hybrid hash-join. Flink builds on this technology, but generalizes it to handle arbitrary objects using its custom serialization and comparison stack. In the following, we show how Flink operates with binary data by the example of Flink’s in-memory sort algorithm.
+
+Flink assigns a memory budget to its data processing operators. Upon initialization, a sort algorithm requests its memory budget from the MemoryManager and receives a corresponding set of MemorySegments. The set of MemorySegments becomes the memory pool of a so-called sort buffer which collects the data that is be sorted. The following figure illustrates how data objects are serialized into the sort buffer. 
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/sorting-binary-data-1.png" style="width:90%;margin:15px">
+</center>
+
+The sort buffer is internally organized into two memory regions. The first region holds the full binary data of all objects. The second region contains pointers to the full binary object data and - depending on the key data type - fixed-length sort keys. When an object is added to the sort buffer, its binary data is appended to the first region, and a pointer (and possibly a key) is appended to the second region. The separation of actual data and pointers plus fixed-length keys is done for two purposes. It enables efficient swapping of fix-length entries (key+pointer) and also reduces the data that needs to be moved when sorting. If the sort key is a variable length data type such as a String, the fixed-length sort key must be a prefix key such as the first n characters of a String. Note, not all data types provide a fixed-length (prefix) sort key. When serializing objects into the sort buffer, both memory regions are extended with MemorySegments from the memory pool. Once the memor
 y pool is empty and no more objects can be added, the sort buffer is completely filled and can be sorted. Flink’s sort buffer provides methods to compare and swap elements. This makes the actual sort algorithm pluggable. By default, Flink uses a Quicksort implementation which can fall back to HeapSort. 
+The following figure shows how two objects are compared.
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/sorting-binary-data-2.png" style="width:80%;margin:15px">
+</center>
+
+The sort buffer compares two elements by comparing their binary fix-length sort keys. The comparison is successful if either done on a full key (not a prefix key) or if the binary prefix keys are not equal. If the prefix keys are equal (or the sort key data type does not provide a binary prefix key), the sort buffer follows the pointers to the actual object data, deserializes both objects and compares the objects. Depending on the result of the comparison, the sort algorithm decides whether to swap the compared elements or not. The sort buffer swaps two elements by moving their fix-length keys and pointers. The actual data is not moved. Once the sort algorithm finishes, the pointers in the sort buffer are correctly ordered. The following figure shows how the sorted data is returned from the sort buffer. 
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/sorting-binary-data-3.png" style="width:80%;margin:15px">
+</center>
+
+The sorted data is returned by sequentially reading the pointer region of the sort buffer, skipping the sort keys and following the sorted pointers to the actual data. This data is either deserialized and returned as objects or the binary representation is copied and written to disk in case of an external merge-sort (see this [blog post on joins in Flink](http://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html)).
+
+
+###Show me numbers!
+
+So, what does operating on binary data mean for performance? We’ll run a benchmark that sorts 10 million `Tuple2<Integer, String>` objects to find out. The values of the Integer field are sampled from a uniform distribution. The String field values have a length of 12 characters and are sampled from a long-tail distribution. The input data is provided by an iterator that returns a mutable object, i.e., the same tuple object instance is returned with different field values. Flink uses this technique when reading data from memory, network, or disk to avoid unnecessary object instantiations. The benchmarks are run in a JVM with 900 MB heap size which is approximately the required amount of memory to store and sort 10 million tuple objects on the heap without dying of an `OutOfMemoryError`. We sort the tuples on the Integer field and on the String field using three sorting methods:
+
+1. **Object-on-heap.** The tuples are stored in a regular `java.util.ArrayList` with initial capacity set to 10 million entries and sorted using Java’s regular collection sort.
+2. **Flink-serialized.** The tuple fields are serialized into a sort buffer of 600 MB size using Flink’s custom serializers, sorted as described above, and finally deserialized again. When sorting on the Integer field, the full Integer is used as sort key such that the sort happens entirely on binary data (no deserialization of objects required). For sorting on the String field a 8-byte prefix key is used and tuple objects are deserialized if the prefix keys are equal.
+3. **Kryo-serialized.** The tuple fields are serialized into a sort buffer of 600 MB size using Kryo serialization and sorted without binary sort keys. This means that each pair-wise comparison requires two object to be deserialized.
+
+All sort methods are implemented using a single thread. The reported times are averaged over ten runs. After each run, we call `System.gc()` to request a garbage collection run which does not go into measured execution time. The following figure shows the time to store the input data in memory, sort it, and read it back as objects. 
+
+<center>
+<img src="{{ site.baseurl }}/img/blog/sort-benchmark.png" style="width:90%;margin:15px">
+</center>
+
+We see that Flink’s sort on binary data using its own serializers significantly outperforms the other two methods. Comparing to the object-on-heap method, we see that loading the data into memory is much faster. Since we actually collect the objects, there is no opportunity to reuse the object instances, but have to re-create every tuple. This is less efficient than Flink’s serializers (or Kryo serialization). On the other hand, reading objects from the heap comes for free compared to deserialization. In our benchmark, object cloning was more expensive than serialization and deserialization combined. Looking at the sorting time, we see that also sorting on the binary representation is faster than Java’s collection sort. Sorting data that was serialized using Kryo without binary sort key, is much slower than both other methods. This is due to the heavy deserialization overhead. Sorting the tuples on their String field is faster than sorting on the Integer field due to the long-
 tailed value distribution which significantly reduces the number of pair-wise comparisons. To get a better feeling of what is happening during sorting we monitored the executing JVM using VisualVM. The following screenshots show heap memory usage, garbage collection activity and CPU usage over the execution of 10 runs.
+
+<table>
+<tr>
+	<td>&nbsp;</td>
+	<th><center><b>Garbage Collection</b></center></td>
+	<th><center><b>Memory Usage</b></center></td>
+</tr>
+<tr>
+	<td><b>Object-on-Heap (int)</b></td>
+	<td><img src="{{ site.baseurl }}/img/blog/objHeap-int-gc.png" style="width:80%;margin:15px"></td>
+	<td><img src="{{ site.baseurl }}/img/blog/objHeap-int-mem.png" style="width:80%;margin:15px"></td>
+</tr>
+<tr>
+	<td><b>Flink-Serialized (int)</b></td>
+	<td><img src="{{ site.baseurl }}/img/blog/flinkSer-int-gc.png" style="width:80%;margin:15px"></td>
+	<td><img src="{{ site.baseurl }}/img/blog/flinkSer-int-mem.png" style="width:80%;margin:15px"></td>
+</tr>
+<tr>
+	<td><b>Kryo-Serialized (int)</b></td>
+	<td><img src="{{ site.baseurl }}/img/blog/kryoSer-int-gc.png" style="width:80%;margin:15px"></td>
+	<td><img src="{{ site.baseurl }}/img/blog/kryoSer-int-mem.png" style="width:80%;margin:15px"></td>
+</tr>
+</table>
+
+The experiments run single-threaded on an 8-core machine, so full utilization of one core only corresponds to a 12.5% overall utilization. The screenshots show that operating on binary data significantly reduces garbage collection activity. For the object-on-heap approach, the garbage collector runs in very short intervals while filling the sort buffer and causes a lot of CPU usage even for a single processing thread (sorting itself does not trigger the garbage collector). The JVM garbage collects with multiple parallel threads, explaining the high overall CPU utilization. On the other hand, the methods that operate on serialized data rarely trigger the garbage collector and have a much lower CPU utilization. In fact the garbage collector does not run at all if the tuples are sorted on the Integer field using the flink-serialized method because no objects need to be deserialized for pair-wise comparisons. The kryo-serialized method requires slightly more garbage collection since it 
 does not use binary sort keys and deserializes two objects for each comparison.
+
+The memory usage charts shows that the flink-serialized and kryo-serialized constantly occupy a high amount of memory (plus some objects for operation). This is due to the pre-allocation of MemorySegments. The actual memory usage is much lower, because the sort buffers are not completely filled. The following table shows the memory consumption of each method. 10 million records result in about 280 MB of binary data (object data plus pointers and sort keys) depending on the used serializer and presence and size of a binary sort key. Comparing this to the memory requirements of the object-on-heap approach we see that operating on binary data can significantly improve memory efficiency. In our benchmark more than twice as much data can be sorted in-memory if serialized into a sort buffer instead of holding it as objects on the heap.
+
+
+<table width="100%">
+<tr><td><b>Occupied Memory</b></td>
+	<td><b>Object-on-Heap</b></td>
+	<td><b>Flink-Serialized</b></td>
+	<td><b>Kryo-Serialized</b></td>
+</tr>
+<tr>
+	<td><b>Sort on Integer</b></td>
+	<td>approx. 700 MB (heap)</td>
+	<td>277 MB (sort buffer)</td>
+	<td>266 MB (sort buffer)</td>
+</tr>
+<tr>
+	<td><b>Sort on String</b></td>
+	<td>approx. 700 MB (heap)</td>
+	<td>315 MB (sort buffer)</td>
+	<td>266 MB (sort buffer)</td>
+</tr>
+</table><br>
+
+To summarize, the experiments verify the previously stated benefits of operating on binary data. 
+
+
+###We’re not done yet!
+
+Apache Flink features quite a bit of advanced techniques to safely and efficiently process huge amounts of data with limited memory resources. However, there are a few points that could make Flink even more efficient. The Flink community is working on moving the managed memory to off-heap memory. This will allow for smaller JVMs, lower garbage collection overhead, and also easier system configuration. With Flink’s Table API, the semantics of all operations such as aggregations and projections are known (in contrast to black-box user-defined functions). Hence we can generate code for Table API operations that directly operates on binary data. Further improvements include serialization layouts which are tailored towards the operations that are applied on the binary data and code generation for serializers and comparators. 
+
+The groundwork (and a lot more) for operating on binary data is done but there is still some room for making Flink even better and faster. If you are crazy about performance and like to juggle with lot of bits and bytes, join the Flink community! 
+
+
+###TL;DR; Give me three things to remember!
+
+* Flink’s active memory management avoids nasty `OutOfMemoryErrors` that kill your JVMs and reduces garbage collection overhead.
+* Flink features a highly efficient data de/serialization stack that facilitates operations on binary data and makes more data fit into memory.
+* Flink’s DBMS-style operators operate natively on binary data yielding high performance in-memory and destage gracefully to disk if necessary.
+
+
+<br>
+<small>Written by Fabian Hueske ([@fhueske](https://twitter.com/fhueske)).</small>

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/archive.html
----------------------------------------------------------------------
diff --git a/content/archive.html b/content/archive.html
index b7c4edb..3c216e5 100644
--- a/content/archive.html
+++ b/content/archive.html
@@ -130,16 +130,16 @@
     <div style="padding-top:50px" class="container">
         <h1>Archive</h1>
 
-<script type="text/javascript">
+<p><script type="text/javascript">
 $( document ).ready(function() {
   // Handler for .ready() called.
-  $('.ga-track').on('click', function() {
+  $(&#39;.ga-track&#39;).on(&#39;click&#39;, function() {
     // we just use the element id for tracking with google analytics
-    ga('send', 'event', 'button', 'click', $(this).attr('id'));
-  });
+    ga(&#39;send&#39;, &#39;event&#39;, &#39;button&#39;, &#39;click&#39;, $(this).attr(&#39;id&#39;));
+  });</p>
 
-});
-</script>
+<p>});
+</script></p>
 
 <p>This page lists old versions of Flink including their documentation.</p>
 

http://git-wip-us.apache.org/repos/asf/flink-web/blob/7a9b4873/content/blog/feed.xml
----------------------------------------------------------------------
diff --git a/content/blog/feed.xml b/content/blog/feed.xml
index d3374f9..f428af3 100644
--- a/content/blog/feed.xml
+++ b/content/blog/feed.xml
@@ -7,6 +7,196 @@
 <atom:link href="http://flink.apache.org/blog/feed.xml" rel="self" type="application/rss+xml" />
 
 <item>
+<title>Juggling with Bits and Bytes</title>
+<description>&lt;h3 id=&quot;how-apache-flink-operates-on-binary-data&quot;&gt;How Apache Flink operates on binary data&lt;/h3&gt;
+
+&lt;p&gt;Nowadays, a lot of open-source systems for analyzing large data sets are implemented in Java or other JVM-based programming languages. The most well-known example is Apache Hadoop, but also newer frameworks such as Apache Spark, Apache Drill, and also Apache Flink run on JVMs. A common challenge that JVM-based data analysis engines face is to store large amounts of data in memory - both for caching and for efficient processing such as sorting and joining of data. Managing the JVM memory well makes the difference between a system that is hard to configure and has unpredictable reliability and performance and a system that behaves robustly with few configuration knobs.&lt;/p&gt;
+
+&lt;p&gt;In this blog post we discuss how Apache Flink manages memory, talk about its custom data de/serialization stack, and show how it operates on binary data.&lt;/p&gt;
+
+&lt;h3 id=&quot;data-objects?-let’s-put-them-on-the-heap!&quot;&gt;Data Objects? Let’s put them on the heap!&lt;/h3&gt;
+
+&lt;p&gt;The most straight-forward approach to process lots of data in a JVM is to put it as objects on the heap and operate on these objects. Caching a data set as objects would be as simple as maintaining a list containing an object for each record. An in-memory sort would simply sort the list of objects.
+However, this approach has a few notable drawbacks. First of all it is not trivial to watch and control heap memory usage when a lot of objects are created and invalidated constantly. Memory overallocation instantly kills the JVM with an &lt;code&gt;OutOfMemoryError&lt;/code&gt;. Another aspect is garbage collection on multi-GB JVMs which are flooded with new objects. The overhead of garbage collection in such environments can easily reach 50% and more. Finally, Java objects come with a certain space overhead depending on the JVM and platform. For data sets with many small objects this can significantly reduce the effectively usable amount of memory. Given proficient system design and careful, use-case specific system parameter tuning, heap memory usage can be more or less controlled and &lt;code&gt;OutOfMemoryErrors&lt;/code&gt; avoided. However, such setups are rather fragile especially if data characteristics or the execution environment change.&lt;/p&gt;
+
+&lt;h3 id=&quot;what-is-flink-doing-about-that?&quot;&gt;What is Flink doing about that?&lt;/h3&gt;
+
+&lt;p&gt;Apache Flink has its roots at a research project which aimed to combine the best technologies of MapReduce-based systems and parallel database systems. Coming from this background, Flink has always had its own way of processing data in-memory. Instead of putting lots of objects on the heap, Flink serializes objects into a fixed number of pre-allocated memory segments. Its DBMS-style sort and join algorithms operate as much as possible on this binary data to keep the de/serialization overhead at a minimum. If more data needs to be processed than can be kept in memory, Flink’s operators partially spill data to disk. In fact, a lot of Flink’s internal implementations look more like C/C++ rather than common Java. The following figure gives a high-level overview of how Flink stores data serialized in memory segments and spills to disk if necessary.&lt;/p&gt;
+
+&lt;p&gt;&lt;center&gt;
+&lt;img src=&quot;/img/blog/memory-mgmt.png&quot; style=&quot;width:90%;margin:15px&quot;&gt;
+&lt;/center&gt;&lt;/p&gt;
+
+&lt;p&gt;Flink’s style of active memory management and operating on binary data has several benefits: &lt;/p&gt;
+
+&lt;ol&gt;
+&lt;li&gt;&lt;strong&gt;Memory-safe execution &amp;amp; efficient out-of-core algorithms.&lt;/strong&gt; Due to the fixed amount of allocated memory segments, it is trivial to monitor remaining memory resources. In case of memory shortage, processing operators can efficiently write larger batches of memory segments to disk and later them read back. Consequently, &lt;code&gt;OutOfMemoryErrors&lt;/code&gt; are effectively prevented.&lt;/li&gt;
+&lt;li&gt;&lt;strong&gt;Reduced garbage collection pressure.&lt;/strong&gt; Because all long-lived data is in binary representation in Flink&amp;#39;s managed memory, all data objects are short-lived or even mutable and can be reused. Short-lived objects can be more efficiently garbage-collected, which significantly reduces garbage collection pressure. Right now, the pre-allocated memory segments are long-lived objects on the JVM heap, but the Flink community is actively working on allocating off-heap memory for this purpose. This effort will result in much smaller JVM heaps and facilitate even faster garbage collection cycles.&lt;/li&gt;
+&lt;li&gt;&lt;strong&gt;Space efficient data representation.&lt;/strong&gt; Java objects have a storage overhead which can be avoided if the data is stored in a binary representation.&lt;/li&gt;
+&lt;li&gt;&lt;strong&gt;Efficient binary operations &amp;amp; cache sensitivity.&lt;/strong&gt; Binary data can be efficiently compared and operated on given a suitable binary representation. Furthermore, the binary representations can put related values, as well as hash codes, keys, and pointers, adjacently into memory. This gives data structures with usually more cache efficient access patterns.&lt;/li&gt;
+&lt;/ol&gt;
+
+&lt;p&gt;These properties of active memory management are very desirable in a data processing systems for large-scale data analytics but have a significant price tag attached. Active memory management and operating on binary data is not trivial to implement, i.e., using &lt;code&gt;java.util.HashMap&lt;/code&gt; is much easier than implementing a spillable hash-table backed by byte arrays and a custom serialization stack. Of course Apache Flink is not the only JVM-based data processing system that operates on serialized binary data. Projects such as &lt;a href=&quot;http://drill.apache.org/&quot;&gt;Apache Drill&lt;/a&gt;, &lt;a href=&quot;http://ignite.incubator.apache.org/&quot;&gt;Apache Ignite (incubating)&lt;/a&gt; or &lt;a href=&quot;http://projectgeode.org/&quot;&gt;Apache Geode (incubating)&lt;/a&gt; apply similar techniques and it was recently announced that also &lt;a href=&quot;http://spark.apache.org/&quot;&gt;Apache Spark&lt;/a&gt; will evolve into this direction with &
 lt;a href=&quot;https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html&quot;&gt;Project Tungsten&lt;/a&gt;. &lt;/p&gt;
+
+&lt;p&gt;In the following we discuss in detail how Flink allocates memory, de/serializes objects, and operates on binary data. We will also show some performance numbers comparing processing objects on the heap and operating on binary data.&lt;/p&gt;
+
+&lt;h3 id=&quot;how-does-flink-allocate-memory?&quot;&gt;How does Flink allocate memory?&lt;/h3&gt;
+
+&lt;p&gt;A Flink worker, called TaskManager, is composed of several internal components such as an actor system for coordination with the Flink master, an IOManager that takes care of spilling data to disk and reading it back, and a MemoryManager that coordinates memory usage. In the context of this blog post, the MemoryManager is of most interest. &lt;/p&gt;
+
+&lt;p&gt;The MemoryManager takes care of allocating, accounting, and distributing MemorySegments to data processing operators such as sort and join operators. A &lt;a href=&quot;https://github.com/apache/flink/blob/release-0.9.0-milestone-1/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java&quot;&gt;MemorySegment&lt;/a&gt; is Flink’s distribution unit of memory and is backed by a regular Java byte array (size is 32 KB by default). A MemorySegment provides very efficient write and read access to its backed byte array using Java’s unsafe methods. You can think of a MemorySegment as a custom-tailored version of Java’s NIO ByteBuffer. In order to operate on multiple MemorySegments like on a larger chunk of consecutive memory, Flink uses logical views that implement Java’s &lt;code&gt;java.io.DataOutput&lt;/code&gt; and &lt;code&gt;java.io.DataInput&lt;/code&gt; interfaces.&lt;/p&gt;
+
+&lt;p&gt;MemorySegments are allocated once at TaskManager start-up time and are destroyed when the TaskManager is shut down. Hence, they are reused and not garbage-collected over the whole lifetime of a TaskManager. After all internal data structures of a TaskManager have been initialized and all core services have been started, the MemoryManager starts creating MemorySegments. By default 70% of the JVM heap that is available after service initialization is allocated by the MemoryManager. It is also possible to configure an absolute amount of managed memory. The remaining JVM heap is used for objects that are instantiated during task processing, including objects created by user-defined functions. The following figure shows the memory distribution in the TaskManager JVM after startup.&lt;/p&gt;
+
+&lt;p&gt;&lt;center&gt;
+&lt;img src=&quot;/img/blog/memory-alloc.png&quot; style=&quot;width:60%;margin:15px&quot;&gt;
+&lt;/center&gt;&lt;/p&gt;
+
+&lt;h3 id=&quot;how-does-flink-serialize-objects?&quot;&gt;How does Flink serialize objects?&lt;/h3&gt;
+
+&lt;p&gt;The Java ecosystem offers several libraries to convert objects into a binary representation and back. Common alternatives are standard Java serialization, &lt;a href=&quot;https://github.com/EsotericSoftware/kryo&quot;&gt;Kryo&lt;/a&gt;, &lt;a href=&quot;http://avro.apache.org/&quot;&gt;Apache Avro&lt;/a&gt;, &lt;a href=&quot;http://thrift.apache.org/&quot;&gt;Apache Thrift&lt;/a&gt;, or Google’s &lt;a href=&quot;https://github.com/google/protobuf&quot;&gt;Protobuf&lt;/a&gt;. Flink includes its own custom serialization framework in order to control the binary representation of data. This is important because operating on binary data such as comparing or even manipulating binary data requires exact knowledge of the serialization layout. Further, configuring the serialization layout with respect to operations that are performed on binary data can yield a significant performance boost. Flink’s serialization stack also leverages the fact, that the type of the objects which 
 are going through de/serialization are exactly known before a program is executed. &lt;/p&gt;
+
+&lt;p&gt;Flink programs can process data represented as arbitrary Java or Scala objects. Before a program is optimized, the data types at each processing step of the program’s data flow need to be identified. For Java programs, Flink features a reflection-based type extraction component to analyze the return types of user-defined functions. Scala programs are analyzed with help of the Scala compiler. Flink represents each data type with a &lt;a href=&quot;https://github.com/apache/flink/blob/release-0.9.0-milestone-1/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java&quot;&gt;TypeInformation&lt;/a&gt;. Flink has TypeInformations for several kinds of data types, including:&lt;/p&gt;
+
+&lt;ul&gt;
+&lt;li&gt;BasicTypeInfo: Any (boxed) Java primitive type or java.lang.String.&lt;/li&gt;
+&lt;li&gt;BasicArrayTypeInfo: Any array of a (boxed) Java primitive type or java.lang.String.&lt;/li&gt;
+&lt;li&gt;WritableTypeInfo: Any implementation of Hadoop’s Writable interface.&lt;/li&gt;
+&lt;li&gt;TupleTypeInfo: Any Flink tuple (Tuple1 to Tuple25). Flink tuples are Java representations for fixed-length tuples with typed fields.&lt;/li&gt;
+&lt;li&gt;CaseClassTypeInfo: Any Scala CaseClass (including Scala tuples).&lt;/li&gt;
+&lt;li&gt;PojoTypeInfo: Any POJO (Java or Scala), i.e., an object with all fields either being public or accessible through getters and setter that follow the common naming conventions. &lt;/li&gt;
+&lt;li&gt;GenericTypeInfo: Any data type that cannot be identified as another type.&lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;p&gt;Each TypeInformation provides a serializer for the data type it represents. For example, a BasicTypeInfo returns a serializer that writes the respective primitive type, the serializer of a WritableTypeInfo delegates de/serialization to the write() and readFields() methods of the object implementing Hadoop’s Writable interface, and a GenericTypeInfo returns a serializer that delegates serialization to Kryo. Object serialization to a DataOutput which is backed by Flink MemorySegments goes automatically through Java’s efficient unsafe operations. For data types that can be used as keys, i.e., compared and hashed, the TypeInformation provides TypeComparators. TypeComparators compare and hash objects and can - depending on the concrete data type - also efficiently compare binary representations and extract fixed-length binary key prefixes. &lt;/p&gt;
+
+&lt;p&gt;Tuple, Pojo, and CaseClass types are composite types, i.e., containers for one or more possibly nested data types. As such, their serializers and comparators are also composite and delegate the serialization and comparison of their member data types to the respective serializers and comparators. The following figure illustrates the serialization of a (nested) &lt;code&gt;Tuple3&amp;lt;Integer, Double, Person&amp;gt;&lt;/code&gt; object where &lt;code&gt;Person&lt;/code&gt; is a POJO and defined as follows:&lt;/p&gt;
+&lt;div class=&quot;highlight&quot;&gt;&lt;pre&gt;&lt;code class=&quot;language-java&quot; data-lang=&quot;java&quot;&gt;&lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span class=&quot;kd&quot;&gt;class&lt;/span&gt; &lt;span class=&quot;nc&quot;&gt;Person&lt;/span&gt; &lt;span class=&quot;o&quot;&gt;{&lt;/span&gt;
+    &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span class=&quot;kt&quot;&gt;int&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;id&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;;&lt;/span&gt;
+    &lt;span class=&quot;kd&quot;&gt;public&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;String&lt;/span&gt; &lt;span class=&quot;n&quot;&gt;name&lt;/span&gt;&lt;span class=&quot;o&quot;&gt;;&lt;/span&gt;
+&lt;span class=&quot;o&quot;&gt;}&lt;/span&gt;
+&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;
+&lt;p&gt;&lt;center&gt;
+&lt;img src=&quot;/img/blog/data-serialization.png&quot; style=&quot;width:80%;margin:15px&quot;&gt;
+&lt;/center&gt;&lt;/p&gt;
+
+&lt;p&gt;Flink’s type system can be easily extended by providing custom TypeInformations, Serializers, and Comparators to improve the performance of serializing and comparing custom data types. &lt;/p&gt;
+
+&lt;h3 id=&quot;how-does-flink-operate-on-binary-data?&quot;&gt;How does Flink operate on binary data?&lt;/h3&gt;
+
+&lt;p&gt;Similar to many other data processing APIs (including SQL), Flink’s APIs provide transformations to group, sort, and join data sets. These transformations operate on potentially very large data sets. Relational database systems feature very efficient algorithms for these purposes since several decades including external merge-sort, merge-join, and hybrid hash-join. Flink builds on this technology, but generalizes it to handle arbitrary objects using its custom serialization and comparison stack. In the following, we show how Flink operates with binary data by the example of Flink’s in-memory sort algorithm.&lt;/p&gt;
+
+&lt;p&gt;Flink assigns a memory budget to its data processing operators. Upon initialization, a sort algorithm requests its memory budget from the MemoryManager and receives a corresponding set of MemorySegments. The set of MemorySegments becomes the memory pool of a so-called sort buffer which collects the data that is be sorted. The following figure illustrates how data objects are serialized into the sort buffer. &lt;/p&gt;
+
+&lt;p&gt;&lt;center&gt;
+&lt;img src=&quot;/img/blog/sorting-binary-data-1.png&quot; style=&quot;width:90%;margin:15px&quot;&gt;
+&lt;/center&gt;&lt;/p&gt;
+
+&lt;p&gt;The sort buffer is internally organized into two memory regions. The first region holds the full binary data of all objects. The second region contains pointers to the full binary object data and - depending on the key data type - fixed-length sort keys. When an object is added to the sort buffer, its binary data is appended to the first region, and a pointer (and possibly a key) is appended to the second region. The separation of actual data and pointers plus fixed-length keys is done for two purposes. It enables efficient swapping of fix-length entries (key+pointer) and also reduces the data that needs to be moved when sorting. If the sort key is a variable length data type such as a String, the fixed-length sort key must be a prefix key such as the first n characters of a String. Note, not all data types provide a fixed-length (prefix) sort key. When serializing objects into the sort buffer, both memory regions are extended with MemorySegments from the memory pool. Once 
 the memory pool is empty and no more objects can be added, the sort buffer is completely filled and can be sorted. Flink’s sort buffer provides methods to compare and swap elements. This makes the actual sort algorithm pluggable. By default, Flink uses a Quicksort implementation which can fall back to HeapSort. 
+The following figure shows how two objects are compared.&lt;/p&gt;
+
+&lt;p&gt;&lt;center&gt;
+&lt;img src=&quot;/img/blog/sorting-binary-data-2.png&quot; style=&quot;width:80%;margin:15px&quot;&gt;
+&lt;/center&gt;&lt;/p&gt;
+
+&lt;p&gt;The sort buffer compares two elements by comparing their binary fix-length sort keys. The comparison is successful if either done on a full key (not a prefix key) or if the binary prefix keys are not equal. If the prefix keys are equal (or the sort key data type does not provide a binary prefix key), the sort buffer follows the pointers to the actual object data, deserializes both objects and compares the objects. Depending on the result of the comparison, the sort algorithm decides whether to swap the compared elements or not. The sort buffer swaps two elements by moving their fix-length keys and pointers. The actual data is not moved. Once the sort algorithm finishes, the pointers in the sort buffer are correctly ordered. The following figure shows how the sorted data is returned from the sort buffer. &lt;/p&gt;
+
+&lt;p&gt;&lt;center&gt;
+&lt;img src=&quot;/img/blog/sorting-binary-data-3.png&quot; style=&quot;width:80%;margin:15px&quot;&gt;
+&lt;/center&gt;&lt;/p&gt;
+
+&lt;p&gt;The sorted data is returned by sequentially reading the pointer region of the sort buffer, skipping the sort keys and following the sorted pointers to the actual data. This data is either deserialized and returned as objects or the binary representation is copied and written to disk in case of an external merge-sort (see this &lt;a href=&quot;http://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html&quot;&gt;blog post on joins in Flink&lt;/a&gt;).&lt;/p&gt;
+
+&lt;h3 id=&quot;show-me-numbers!&quot;&gt;Show me numbers!&lt;/h3&gt;
+
+&lt;p&gt;So, what does operating on binary data mean for performance? We’ll run a benchmark that sorts 10 million &lt;code&gt;Tuple2&amp;lt;Integer, String&amp;gt;&lt;/code&gt; objects to find out. The values of the Integer field are sampled from a uniform distribution. The String field values have a length of 12 characters and are sampled from a long-tail distribution. The input data is provided by an iterator that returns a mutable object, i.e., the same tuple object instance is returned with different field values. Flink uses this technique when reading data from memory, network, or disk to avoid unnecessary object instantiations. The benchmarks are run in a JVM with 900 MB heap size which is approximately the required amount of memory to store and sort 10 million tuple objects on the heap without dying of an &lt;code&gt;OutOfMemoryError&lt;/code&gt;. We sort the tuples on the Integer field and on the String field using three sorting methods:&lt;/p&gt;
+
+&lt;ol&gt;
+&lt;li&gt;&lt;strong&gt;Object-on-heap.&lt;/strong&gt; The tuples are stored in a regular &lt;code&gt;java.util.ArrayList&lt;/code&gt; with initial capacity set to 10 million entries and sorted using Java’s regular collection sort.&lt;/li&gt;
+&lt;li&gt;&lt;strong&gt;Flink-serialized.&lt;/strong&gt; The tuple fields are serialized into a sort buffer of 600 MB size using Flink’s custom serializers, sorted as described above, and finally deserialized again. When sorting on the Integer field, the full Integer is used as sort key such that the sort happens entirely on binary data (no deserialization of objects required). For sorting on the String field a 8-byte prefix key is used and tuple objects are deserialized if the prefix keys are equal.&lt;/li&gt;
+&lt;li&gt;&lt;strong&gt;Kryo-serialized.&lt;/strong&gt; The tuple fields are serialized into a sort buffer of 600 MB size using Kryo serialization and sorted without binary sort keys. This means that each pair-wise comparison requires two object to be deserialized.&lt;/li&gt;
+&lt;/ol&gt;
+
+&lt;p&gt;All sort methods are implemented using a single thread. The reported times are averaged over ten runs. After each run, we call &lt;code&gt;System.gc()&lt;/code&gt; to request a garbage collection run which does not go into measured execution time. The following figure shows the time to store the input data in memory, sort it, and read it back as objects. &lt;/p&gt;
+
+&lt;p&gt;&lt;center&gt;
+&lt;img src=&quot;/img/blog/sort-benchmark.png&quot; style=&quot;width:90%;margin:15px&quot;&gt;
+&lt;/center&gt;&lt;/p&gt;
+
+&lt;p&gt;We see that Flink’s sort on binary data using its own serializers significantly outperforms the other two methods. Comparing to the object-on-heap method, we see that loading the data into memory is much faster. Since we actually collect the objects, there is no opportunity to reuse the object instances, but have to re-create every tuple. This is less efficient than Flink’s serializers (or Kryo serialization). On the other hand, reading objects from the heap comes for free compared to deserialization. In our benchmark, object cloning was more expensive than serialization and deserialization combined. Looking at the sorting time, we see that also sorting on the binary representation is faster than Java’s collection sort. Sorting data that was serialized using Kryo without binary sort key, is much slower than both other methods. This is due to the heavy deserialization overhead. Sorting the tuples on their String field is faster than sorting on the Integer field due to 
 the long-tailed value distribution which significantly reduces the number of pair-wise comparisons. To get a better feeling of what is happening during sorting we monitored the executing JVM using VisualVM. The following screenshots show heap memory usage, garbage collection activity and CPU usage over the execution of 10 runs.&lt;/p&gt;
+
+&lt;table&gt;
+&lt;tr&gt;
+    &lt;td&gt;&amp;nbsp;&lt;/td&gt;
+    &lt;th&gt;&lt;center&gt;&lt;b&gt;Garbage Collection&lt;/b&gt;&lt;/center&gt;&lt;/td&gt;
+    &lt;th&gt;&lt;center&gt;&lt;b&gt;Memory Usage&lt;/b&gt;&lt;/center&gt;&lt;/td&gt;
+&lt;/tr&gt;
+&lt;tr&gt;
+    &lt;td&gt;&lt;b&gt;Object-on-Heap (int)&lt;/b&gt;&lt;/td&gt;
+    &lt;td&gt;&lt;img src=&quot;/img/blog/objHeap-int-gc.png&quot; style=&quot;width:80%;margin:15px&quot;&gt;&lt;/td&gt;
+    &lt;td&gt;&lt;img src=&quot;/img/blog/objHeap-int-mem.png&quot; style=&quot;width:80%;margin:15px&quot;&gt;&lt;/td&gt;
+&lt;/tr&gt;
+&lt;tr&gt;
+    &lt;td&gt;&lt;b&gt;Flink-Serialized (int)&lt;/b&gt;&lt;/td&gt;
+    &lt;td&gt;&lt;img src=&quot;/img/blog/flinkSer-int-gc.png&quot; style=&quot;width:80%;margin:15px&quot;&gt;&lt;/td&gt;
+    &lt;td&gt;&lt;img src=&quot;/img/blog/flinkSer-int-mem.png&quot; style=&quot;width:80%;margin:15px&quot;&gt;&lt;/td&gt;
+&lt;/tr&gt;
+&lt;tr&gt;
+    &lt;td&gt;&lt;b&gt;Kryo-Serialized (int)&lt;/b&gt;&lt;/td&gt;
+    &lt;td&gt;&lt;img src=&quot;/img/blog/kryoSer-int-gc.png&quot; style=&quot;width:80%;margin:15px&quot;&gt;&lt;/td&gt;
+    &lt;td&gt;&lt;img src=&quot;/img/blog/kryoSer-int-mem.png&quot; style=&quot;width:80%;margin:15px&quot;&gt;&lt;/td&gt;
+&lt;/tr&gt;
+&lt;/table&gt;
+
+&lt;p&gt;The experiments run single-threaded on an 8-core machine, so full utilization of one core only corresponds to a 12.5% overall utilization. The screenshots show that operating on binary data significantly reduces garbage collection activity. For the object-on-heap approach, the garbage collector runs in very short intervals while filling the sort buffer and causes a lot of CPU usage even for a single processing thread (sorting itself does not trigger the garbage collector). The JVM garbage collects with multiple parallel threads, explaining the high overall CPU utilization. On the other hand, the methods that operate on serialized data rarely trigger the garbage collector and have a much lower CPU utilization. In fact the garbage collector does not run at all if the tuples are sorted on the Integer field using the flink-serialized method because no objects need to be deserialized for pair-wise comparisons. The kryo-serialized method requires slightly more garbage collection 
 since it does not use binary sort keys and deserializes two objects for each comparison.&lt;/p&gt;
+
+&lt;p&gt;The memory usage charts shows that the flink-serialized and kryo-serialized constantly occupy a high amount of memory (plus some objects for operation). This is due to the pre-allocation of MemorySegments. The actual memory usage is much lower, because the sort buffers are not completely filled. The following table shows the memory consumption of each method. 10 million records result in about 280 MB of binary data (object data plus pointers and sort keys) depending on the used serializer and presence and size of a binary sort key. Comparing this to the memory requirements of the object-on-heap approach we see that operating on binary data can significantly improve memory efficiency. In our benchmark more than twice as much data can be sorted in-memory if serialized into a sort buffer instead of holding it as objects on the heap.&lt;/p&gt;
+
+&lt;p&gt;&lt;table width=&quot;100%&quot;&gt;
+&lt;tr&gt;&lt;td&gt;&lt;b&gt;Occupied Memory&lt;/b&gt;&lt;/td&gt;
+    &lt;td&gt;&lt;b&gt;Object-on-Heap&lt;/b&gt;&lt;/td&gt;
+    &lt;td&gt;&lt;b&gt;Flink-Serialized&lt;/b&gt;&lt;/td&gt;
+    &lt;td&gt;&lt;b&gt;Kryo-Serialized&lt;/b&gt;&lt;/td&gt;
+&lt;/tr&gt;
+&lt;tr&gt;
+    &lt;td&gt;&lt;b&gt;Sort on Integer&lt;/b&gt;&lt;/td&gt;
+    &lt;td&gt;approx. 700 MB (heap)&lt;/td&gt;
+    &lt;td&gt;277 MB (sort buffer)&lt;/td&gt;
+    &lt;td&gt;266 MB (sort buffer)&lt;/td&gt;
+&lt;/tr&gt;
+&lt;tr&gt;
+    &lt;td&gt;&lt;b&gt;Sort on String&lt;/b&gt;&lt;/td&gt;
+    &lt;td&gt;approx. 700 MB (heap)&lt;/td&gt;
+    &lt;td&gt;315 MB (sort buffer)&lt;/td&gt;
+    &lt;td&gt;266 MB (sort buffer)&lt;/td&gt;
+&lt;/tr&gt;
+&lt;/table&gt;&lt;br&gt;&lt;/p&gt;
+
+&lt;p&gt;To summarize, the experiments verify the previously stated benefits of operating on binary data. &lt;/p&gt;
+
+&lt;h3 id=&quot;we’re-not-done-yet!&quot;&gt;We’re not done yet!&lt;/h3&gt;
+
+&lt;p&gt;Apache Flink features quite a bit of advanced techniques to safely and efficiently process huge amounts of data with limited memory resources. However, there are a few points that could make Flink even more efficient. The Flink community is working on moving the managed memory to off-heap memory. This will allow for smaller JVMs, lower garbage collection overhead, and also easier system configuration. With Flink’s Table API, the semantics of all operations such as aggregations and projections are known (in contrast to black-box user-defined functions). Hence we can generate code for Table API operations that directly operates on binary data. Further improvements include serialization layouts which are tailored towards the operations that are applied on the binary data and code generation for serializers and comparators. &lt;/p&gt;
+
+&lt;p&gt;The groundwork (and a lot more) for operating on binary data is done but there is still some room for making Flink even better and faster. If you are crazy about performance and like to juggle with lot of bits and bytes, join the Flink community! &lt;/p&gt;
+
+&lt;h3 id=&quot;tl;dr;-give-me-three-things-to-remember!&quot;&gt;TL;DR; Give me three things to remember!&lt;/h3&gt;
+
+&lt;ul&gt;
+&lt;li&gt;Flink’s active memory management avoids nasty &lt;code&gt;OutOfMemoryErrors&lt;/code&gt; that kill your JVMs and reduces garbage collection overhead.&lt;/li&gt;
+&lt;li&gt;Flink features a highly efficient data de/serialization stack that facilitates operations on binary data and makes more data fit into memory.&lt;/li&gt;
+&lt;li&gt;Flink’s DBMS-style operators operate natively on binary data yielding high performance in-memory and destage gracefully to disk if necessary.&lt;/li&gt;
+&lt;/ul&gt;
+
+&lt;p&gt;&lt;br&gt;
+&lt;small&gt;Written by Fabian Hueske (&lt;a href=&quot;https://twitter.com/fhueske&quot;&gt;@fhueske&lt;/a&gt;).&lt;/small&gt;&lt;/p&gt;
+</description>
+<pubDate>Mon, 11 May 2015 12:00:00 +0200</pubDate>
+<link>http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html</link>
+<guid isPermaLink="true">/news/2015/05/11/Juggling-with-Bits-and-Bytes.html</guid>
+</item>
+
+<item>
 <title>Announcing Flink 0.9.0-milestone1 preview release</title>
 <description>&lt;p&gt;The Apache Flink community is pleased to announce the availability of
 the 0.9.0-milestone-1 release. The release is a preview of the