You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/08/24 09:27:09 UTC

[49/51] [partial] flink git commit: [FLINK-4317, FLIP-3] [docs] Restructure docs

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/batch/examples.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/examples.md b/docs/apis/batch/examples.md
deleted file mode 100644
index 3523185..0000000
--- a/docs/apis/batch/examples.md
+++ /dev/null
@@ -1,521 +0,0 @@
----
-title:  "Bundled Examples"
-
-# Sub-level navigation
-sub-nav-group: batch
-sub-nav-pos: 5
-sub-nav-title: Examples
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-The following example programs showcase different applications of Flink
-from simple word counting to graph algorithms. The code samples illustrate the
-use of [Flink's API](index.html).
-
-The full source code of the following and more examples can be found in the __flink-examples-batch__
-or __flink-examples-streaming__ module of the Flink source repository.
-
-* This will be replaced by the TOC
-{:toc}
-
-
-## Running an example
-
-In order to run a Flink example, we assume you have a running Flink instance available. The "Setup" tab in the navigation describes various ways of starting Flink.
-
-The easiest way is running the `./bin/start-local.sh` script, which will start a JobManager locally.
-
-Each binary release of Flink contains an `examples` directory with jar files for each of the examples on this page.
-
-To run the WordCount example, issue the following command:
-
-~~~bash
-./bin/flink run ./examples/batch/WordCount.jar
-~~~
-
-The other examples can be started in a similar way.
-
-Note that many examples run without passing any arguments for them, by using build-in data. To run WordCount with real data, you have to pass the path to the data:
-
-~~~bash
-./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result
-~~~
-
-Note that non-local file systems require a schema prefix, such as `hdfs://`.
-
-
-## Word Count
-WordCount is the "Hello World" of Big Data processing systems. It computes the frequency of words in a text collection. The algorithm works in two steps: First, the texts are splits the text to individual words. Second, the words are grouped and counted.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-~~~java
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-DataSet<String> text = env.readTextFile("/path/to/file");
-
-DataSet<Tuple2<String, Integer>> counts =
-        // split up the lines in pairs (2-tuples) containing: (word,1)
-        text.flatMap(new Tokenizer())
-        // group by the tuple field "0" and sum up tuple field "1"
-        .groupBy(0)
-        .sum(1);
-
-counts.writeAsCsv(outputPath, "\n", " ");
-
-// User-defined functions
-public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-
-    @Override
-    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
-        // normalize and split the line
-        String[] tokens = value.toLowerCase().split("\\W+");
-
-        // emit the pairs
-        for (String token : tokens) {
-            if (token.length() > 0) {
-                out.collect(new Tuple2<String, Integer>(token, 1));
-            }   
-        }
-    }
-}
-~~~
-
-The {% gh_link /flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java  "WordCount example" %} implements the above described algorithm with input parameters: `--input <path> --output <path>`. As test data, any text file will do.
-
-</div>
-<div data-lang="scala" markdown="1">
-
-~~~scala
-val env = ExecutionEnvironment.getExecutionEnvironment
-
-// get input data
-val text = env.readTextFile("/path/to/file")
-
-val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
-  .map { (_, 1) }
-  .groupBy(0)
-  .sum(1)
-
-counts.writeAsCsv(outputPath, "\n", " ")
-~~~
-
-The {% gh_link /flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala  "WordCount example" %} implements the above described algorithm with input parameters: `--input <path> --output <path>`. As test data, any text file will do.
-
-
-</div>
-</div>
-
-## Page Rank
-
-The PageRank algorithm computes the "importance" of pages in a graph defined by links, which point from one pages to another page. It is an iterative graph algorithm, which means that it repeatedly applies the same computation. In each iteration, each page distributes its current rank over all its neighbors, and compute its new rank as a taxed sum of the ranks it received from its neighbors. The PageRank algorithm was popularized by the Google search engine which uses the importance of webpages to rank the results of search queries.
-
-In this simple example, PageRank is implemented with a [bulk iteration](iterations.html) and a fixed number of iterations.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-~~~java
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-// read the pages and initial ranks by parsing a CSV file
-DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
-						   .types(Long.class, Double.class)
-
-// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
-DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);
-
-// set iterative data set
-IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
-
-DataSet<Tuple2<Long, Double>> newRanks = iteration
-        // join pages with outgoing edges and distribute rank
-        .join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
-        // collect and sum ranks
-        .groupBy(0).sum(1)
-        // apply dampening factor
-        .map(new Dampener(DAMPENING_FACTOR, numPages));
-
-DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
-        newRanks,
-        newRanks.join(iteration).where(0).equalTo(0)
-        // termination condition
-        .filter(new EpsilonFilter()));
-
-finalPageRanks.writeAsCsv(outputPath, "\n", " ");
-
-// User-defined functions
-
-public static final class JoinVertexWithEdgesMatch
-                    implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>,
-                                            Tuple2<Long, Double>> {
-
-    @Override
-    public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj,
-                        Collector<Tuple2<Long, Double>> out) {
-        Long[] neighbors = adj.f1;
-        double rank = page.f1;
-        double rankToDistribute = rank / ((double) neigbors.length);
-
-        for (int i = 0; i < neighbors.length; i++) {
-            out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute));
-        }
-    }
-}
-
-public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
-    private final double dampening, randomJump;
-
-    public Dampener(double dampening, double numVertices) {
-        this.dampening = dampening;
-        this.randomJump = (1 - dampening) / numVertices;
-    }
-
-    @Override
-    public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
-        value.f1 = (value.f1 * dampening) + randomJump;
-        return value;
-    }
-}
-
-public static final class EpsilonFilter
-                implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
-
-    @Override
-    public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
-        return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
-    }
-}
-~~~
-
-The {% gh_link /flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/PageRank.java "PageRank program" %} implements the above example.
-It requires the following parameters to run: `--pages <path> --links <path> --output <path> --numPages <n> --iterations <n>`.
-
-</div>
-<div data-lang="scala" markdown="1">
-
-~~~scala
-// User-defined types
-case class Link(sourceId: Long, targetId: Long)
-case class Page(pageId: Long, rank: Double)
-case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
-
-// set up execution environment
-val env = ExecutionEnvironment.getExecutionEnvironment
-
-// read the pages and initial ranks by parsing a CSV file
-val pages = env.readCsvFile[Page](pagesInputPath)
-
-// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
-val links = env.readCsvFile[Link](linksInputPath)
-
-// assign initial ranks to pages
-val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))
-
-// build adjacency list from link input
-val adjacencyLists = links
-  // initialize lists
-  .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
-  // concatenate lists
-  .groupBy("sourceId").reduce {
-  (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
-  }
-
-// start iteration
-val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
-  currentRanks =>
-    val newRanks = currentRanks
-      // distribute ranks to target pages
-      .join(adjacencyLists).where("pageId").equalTo("sourceId") {
-        (page, adjacent, out: Collector[Page]) =>
-        for (targetId <- adjacent.targetIds) {
-          out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
-        }
-      }
-      // collect ranks and sum them up
-      .groupBy("pageId").aggregate(SUM, "rank")
-      // apply dampening factor
-      .map { p =>
-        Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
-      }
-
-    // terminate if no rank update was significant
-    val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
-      (current, next, out: Collector[Int]) =>
-        // check for significant update
-        if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
-    }
-
-    (newRanks, termination)
-}
-
-val result = finalRanks
-
-// emit result
-result.writeAsCsv(outputPath, "\n", " ")
-~~~
-
-he {% gh_link /flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala "PageRank program" %} implements the above example.
-It requires the following parameters to run: `--pages <path> --links <path> --output <path> --numPages <n> --iterations <n>`.
-</div>
-</div>
-
-Input files are plain text files and must be formatted as follows:
-- Pages represented as an (long) ID separated by new-line characters.
-    * For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 42, and 63.
-- Links are represented as pairs of page IDs which are separated by space characters. Links are separated by new-line characters:
-    * For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (directed) links (1)->(2), (2)->(12), (1)->(12), and (42)->(63).
-
-For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself).
-
-## Connected Components
-
-The Connected Components algorithm identifies parts of a larger graph which are connected by assigning all vertices in the same connected part the same component ID. Similar to PageRank, Connected Components is an iterative algorithm. In each step, each vertex propagates its current component ID to all its neighbors. A vertex accepts the component ID from a neighbor, if it is smaller than its own component ID.
-
-This implementation uses a [delta iteration](iterations.html): Vertices that have not changed their component ID do not participate in the next step. This yields much better performance, because the later iterations typically deal only with a few outlier vertices.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-~~~java
-// read vertex and edge data
-DataSet<Long> vertices = getVertexDataSet(env);
-DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());
-
-// assign the initial component IDs (equal to the vertex ID)
-DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
-
-// open a delta iteration
-DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
-        verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);
-
-// apply the step logic:
-DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
-        // join with the edges
-        .join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
-        // select the minimum neighbor component ID
-        .groupBy(0).aggregate(Aggregations.MIN, 1)
-        // update if the component ID of the candidate is smaller
-        .join(iteration.getSolutionSet()).where(0).equalTo(0)
-        .flatMap(new ComponentIdFilter());
-
-// close the delta iteration (delta and new workset are identical)
-DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
-
-// emit result
-result.writeAsCsv(outputPath, "\n", " ");
-
-// User-defined functions
-
-public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
-
-    @Override
-    public Tuple2<T, T> map(T vertex) {
-        return new Tuple2<T, T>(vertex, vertex);
-    }
-}
-
-public static final class UndirectEdge
-                    implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-    Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
-
-    @Override
-    public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
-        invertedEdge.f0 = edge.f1;
-        invertedEdge.f1 = edge.f0;
-        out.collect(edge);
-        out.collect(invertedEdge);
-    }
-}
-
-public static final class NeighborWithComponentIDJoin
-                implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-    @Override
-    public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
-        return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
-    }
-}
-
-public static final class ComponentIdFilter
-                    implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,
-                                            Tuple2<Long, Long>> {
-
-    @Override
-    public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value,
-                        Collector<Tuple2<Long, Long>> out) {
-        if (value.f0.f1 < value.f1.f1) {
-            out.collect(value.f0);
-        }
-    }
-}
-~~~
-
-The {% gh_link /flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java "ConnectedComponents program" %} implements the above example. It requires the following parameters to run: `--vertices <path> --edges <path> --output <path> --iterations <n>`.
-
-</div>
-<div data-lang="scala" markdown="1">
-
-~~~scala
-// set up execution environment
-val env = ExecutionEnvironment.getExecutionEnvironment
-
-// read vertex and edge data
-// assign the initial components (equal to the vertex id)
-val vertices = getVerticesDataSet(env).map { id => (id, id) }
-
-// undirected edges by emitting for each input edge the input edges itself and an inverted
-// version
-val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }
-
-// open a delta iteration
-val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
-  (s, ws) =>
-
-    // apply the step logic: join with the edges
-    val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
-      (edge._2, vertex._2)
-    }
-
-    // select the minimum neighbor
-    val minNeighbors = allNeighbors.groupBy(0).min(1)
-
-    // update if the component of the candidate is smaller
-    val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
-      (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
-        if (newVertex._2 < oldVertex._2) out.collect(newVertex)
-    }
-
-    // delta and new workset are identical
-    (updatedComponents, updatedComponents)
-}
-
-verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
-
-~~~
-
-The {% gh_link /flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala "ConnectedComponents program" %} implements the above example. It requires the following parameters to run: `--vertices <path> --edges <path> --output <path> --iterations <n>`.
-</div>
-</div>
-
-Input files are plain text files and must be formatted as follows:
-- Vertices represented as IDs and separated by new-line characters.
-    * For example `"1\n2\n12\n42\n63\n"` gives five vertices with (1), (2), (12), (42), and (63).
-- Edges are represented as pairs for vertex IDs which are separated by space characters. Edges are separated by new-line characters:
-    * For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (undirected) links (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
-
-## Relational Query
-
-The Relational Query example assumes two tables, one with `orders` and the other with `lineitems` as specified by the [TPC-H decision support benchmark](http://www.tpc.org/tpch/). TPC-H is a standard benchmark in the database industry. See below for instructions how to generate the input data.
-
-The example implements the following SQL query.
-
-~~~sql
-SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
-    FROM orders, lineitem
-WHERE l_orderkey = o_orderkey
-    AND o_orderstatus = "F"
-    AND YEAR(o_orderdate) > 1993
-    AND o_orderpriority LIKE "5%"
-GROUP BY l_orderkey, o_shippriority;
-~~~
-
-The Flink program, which implements the above query looks as follows.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-~~~java
-// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
-DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);
-// get lineitem data set: (orderkey, extendedprice)
-DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);
-
-// orders filtered by year: (orderkey, custkey)
-DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
-        // filter orders
-        orders.filter(
-            new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() {
-                @Override
-                public boolean filter(Tuple5<Integer, String, String, String, Integer> t) {
-                    // status filter
-                    if(!t.f1.equals(STATUS_FILTER)) {
-                        return false;
-                    // year filter
-                    } else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) {
-                        return false;
-                    // order priority filter
-                    } else if(!t.f3.startsWith(OPRIO_FILTER)) {
-                        return false;
-                    }
-                    return true;
-                }
-            })
-        // project fields out that are no longer required
-        .project(0,4).types(Integer.class, Integer.class);
-
-// join orders with lineitems: (orderkey, shippriority, extendedprice)
-DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders =
-        ordersFilteredByYear.joinWithHuge(lineitems)
-                            .where(0).equalTo(0)
-                            .projectFirst(0,1).projectSecond(1)
-                            .types(Integer.class, Integer.class, Double.class);
-
-// extendedprice sums: (orderkey, shippriority, sum(extendedprice))
-DataSet<Tuple3<Integer, Integer, Double>> priceSums =
-        // group by order and sum extendedprice
-        lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);
-
-// emit result
-priceSums.writeAsCsv(outputPath);
-~~~
-
-The {% gh_link /flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java "Relational Query program" %} implements the above query. It requires the following parameters to run: `--orders <path> --lineitem <path> --output <path>`.
-
-</div>
-<div data-lang="scala" markdown="1">
-Coming soon...
-
-The {% gh_link /flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala "Relational Query program" %} implements the above query. It requires the following parameters to run: `--orders <path> --lineitem <path> --output <path>`.
-
-</div>
-</div>
-
-The orders and lineitem files can be generated using the [TPC-H benchmark](http://www.tpc.org/tpch/) suite's data generator tool (DBGEN).
-Take the following steps to generate arbitrary large input files for the provided Flink programs:
-
-1.  Download and unpack DBGEN
-2.  Make a copy of *makefile.suite* called *Makefile* and perform the following changes:
-
-~~~bash
-DATABASE = DB2
-MACHINE  = LINUX
-WORKLOAD = TPCH
-CC       = gcc
-~~~
-
-1.  Build DBGEN using *make*
-2.  Generate lineitem and orders relations using dbgen. A scale factor
-    (-s) of 1 results in a generated data set with about 1 GB size.
-
-~~~bash
-./dbgen -T o -s 1
-~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/batch/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fault_tolerance.md b/docs/apis/batch/fault_tolerance.md
deleted file mode 100644
index 51a6b41..0000000
--- a/docs/apis/batch/fault_tolerance.md
+++ /dev/null
@@ -1,100 +0,0 @@
----
-title: "Fault Tolerance"
-
-# Sub-level navigation
-sub-nav-group: batch
-sub-nav-pos: 2
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Flink's fault tolerance mechanism recovers programs in the presence of failures and
-continues to execute them. Such failures include machine hardware failures, network failures,
-transient program failures, etc.
-
-* This will be replaced by the TOC
-{:toc}
-
-Batch Processing Fault Tolerance (DataSet API)
-----------------------------------------------
-
-Fault tolerance for programs in the *DataSet API* works by retrying failed executions.
-The number of time that Flink retries the execution before the job is declared as failed is configurable
-via the *execution retries* parameter. A value of *0* effectively means that fault tolerance is deactivated.
-
-To activate the fault tolerance, set the *execution retries* to a value larger than zero. A common choice is a value
-of three.
-
-This example shows how to configure the execution retries for a Flink DataSet program.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setNumberOfExecutionRetries(3);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setNumberOfExecutionRetries(3)
-{% endhighlight %}
-</div>
-</div>
-
-
-You can also define default values for the number of execution retries and the retry delay in the `flink-conf.yaml`:
-
-~~~
-execution-retries.default: 3
-~~~
-
-
-Retry Delays
-------------
-
-Execution retries can be configured to be delayed. Delaying the retry means that after a failed execution, the re-execution does not start
-immediately, but only after a certain delay.
-
-Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted.
-
-You can set the retry delay for each program as follows (the sample shows the DataStream API - the DataSet API works similarly):
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env.getConfig().setExecutionRetryDelay(5000); // 5000 milliseconds delay
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.getConfig.setExecutionRetryDelay(5000) // 5000 milliseconds delay
-{% endhighlight %}
-</div>
-</div>
-
-You can also define the default value for the retry delay in the `flink-conf.yaml`:
-
-~~~
-execution-retries.delay: 10 s
-~~~
-
-{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/batch/fig/LICENSE.txt
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/LICENSE.txt b/docs/apis/batch/fig/LICENSE.txt
deleted file mode 100644
index 35b8673..0000000
--- a/docs/apis/batch/fig/LICENSE.txt
+++ /dev/null
@@ -1,17 +0,0 @@
-All image files in the folder and its subfolders are
-licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/batch/fig/iterations_delta_iterate_operator.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/iterations_delta_iterate_operator.png b/docs/apis/batch/fig/iterations_delta_iterate_operator.png
deleted file mode 100644
index 470485a..0000000
Binary files a/docs/apis/batch/fig/iterations_delta_iterate_operator.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png b/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png
deleted file mode 100644
index 15f2b54..0000000
Binary files a/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/batch/fig/iterations_iterate_operator.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/iterations_iterate_operator.png b/docs/apis/batch/fig/iterations_iterate_operator.png
deleted file mode 100644
index aaf4158..0000000
Binary files a/docs/apis/batch/fig/iterations_iterate_operator.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/batch/fig/iterations_iterate_operator_example.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/iterations_iterate_operator_example.png b/docs/apis/batch/fig/iterations_iterate_operator_example.png
deleted file mode 100644
index be4841c..0000000
Binary files a/docs/apis/batch/fig/iterations_iterate_operator_example.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/batch/fig/iterations_supersteps.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/iterations_supersteps.png b/docs/apis/batch/fig/iterations_supersteps.png
deleted file mode 100644
index 331dbc7..0000000
Binary files a/docs/apis/batch/fig/iterations_supersteps.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/844c874b/docs/apis/batch/hadoop_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/hadoop_compatibility.md b/docs/apis/batch/hadoop_compatibility.md
deleted file mode 100644
index 2bea8db..0000000
--- a/docs/apis/batch/hadoop_compatibility.md
+++ /dev/null
@@ -1,249 +0,0 @@
----
-title: "Hadoop Compatibility"
-is_beta: true
-# Sub-level navigation
-sub-nav-group: batch
-sub-nav-pos: 7
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-Flink is compatible with Apache Hadoop MapReduce interfaces and therefore allows
-reusing code that was implemented for Hadoop MapReduce.
-
-You can:
-
-- use Hadoop's `Writable` [data types](index.html#data-types) in Flink programs.
-- use any Hadoop `InputFormat` as a [DataSource](index.html#data-sources).
-- use any Hadoop `OutputFormat` as a [DataSink](index.html#data-sinks).
-- use a Hadoop `Mapper` as [FlatMapFunction](dataset_transformations.html#flatmap).
-- use a Hadoop `Reducer` as [GroupReduceFunction](dataset_transformations.html#groupreduce-on-grouped-dataset).
-
-This document shows how to use existing Hadoop MapReduce code with Flink. Please refer to the
-[Connecting to other systems]({{ site.baseurl }}/apis/connectors.html) guide for reading from Hadoop supported file systems.
-
-* This will be replaced by the TOC
-{:toc}
-
-### Project Configuration
-
-Support for Haddop input/output formats is part of the `flink-java` and
-`flink-scala` Maven modules that are always required when writing Flink jobs.
-The code is located in `org.apache.flink.api.java.hadoop` and
-`org.apache.flink.api.scala.hadoop` in an additional sub-package for the
-`mapred` and `mapreduce` API.
-
-Support for Hadoop Mappers and Reducers is contained in the `flink-hadoop-compatibility`
-Maven module.
-This code resides in the `org.apache.flink.hadoopcompatibility`
-package.
-
-Add the following dependency to your `pom.xml` if you want to reuse Mappers
-and Reducers.
-
-~~~xml
-<dependency>
-	<groupId>org.apache.flink</groupId>
-	<artifactId>flink-hadoop-compatibility{{ site.scala_version_suffix }}</artifactId>
-	<version>{{site.version}}</version>
-</dependency>
-~~~
-
-### Using Hadoop Data Types
-
-Flink supports all Hadoop `Writable` and `WritableComparable` data types
-out-of-the-box. You do not need to include the Hadoop Compatibility dependency,
-if you only want to use your Hadoop data types. See the
-[Programming Guide](index.html#data-types) for more details.
-
-### Using Hadoop InputFormats
-
-Hadoop input formats can be used to create a data source by using
-one of the methods `readHadoopFile` or `createHadoopInput` of the
-`ExecutionEnvironment`. The former is used for input formats derived
-from `FileInputFormat` while the latter has to be used for general purpose
-input formats.
-
-The resulting `DataSet` contains 2-tuples where the first field
-is the key and the second field is the value retrieved from the Hadoop
-InputFormat.
-
-The following example shows how to use Hadoop's `TextInputFormat`.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-~~~java
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-DataSet<Tuple2<LongWritable, Text>> input =
-    env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath);
-
-// Do something with the data.
-[...]
-~~~
-
-</div>
-<div data-lang="scala" markdown="1">
-
-~~~scala
-val env = ExecutionEnvironment.getExecutionEnvironment
-
-val input: DataSet[(LongWritable, Text)] =
-  env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
-
-// Do something with the data.
-[...]
-~~~
-
-</div>
-
-</div>
-
-### Using Hadoop OutputFormats
-
-Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class
-that implements `org.apache.hadoop.mapred.OutputFormat` or extends
-`org.apache.hadoop.mapreduce.OutputFormat` is supported.
-The OutputFormat wrapper expects its input data to be a DataSet containing
-2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.
-
-The following example shows how to use Hadoop's `TextOutputFormat`.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-~~~java
-// Obtain the result we want to emit
-DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
-
-// Set up the Hadoop TextOutputFormat.
-HadoopOutputFormat<Text, IntWritable> hadoopOF =
-  // create the Flink wrapper.
-  new HadoopOutputFormat<Text, IntWritable>(
-    // set the Hadoop OutputFormat and specify the job.
-    new TextOutputFormat<Text, IntWritable>(), job
-  );
-hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
-TextOutputFormat.setOutputPath(job, new Path(outputPath));
-
-// Emit data using the Hadoop TextOutputFormat.
-hadoopResult.output(hadoopOF);
-~~~
-
-</div>
-<div data-lang="scala" markdown="1">
-
-~~~scala
-// Obtain your result to emit.
-val hadoopResult: DataSet[(Text, IntWritable)] = [...]
-
-val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
-  new TextOutputFormat[Text, IntWritable],
-  new JobConf)
-
-hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
-FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
-
-hadoopResult.output(hadoopOF)
-
-
-~~~
-
-</div>
-
-</div>
-
-### Using Hadoop Mappers and Reducers
-
-Hadoop Mappers are semantically equivalent to Flink's [FlatMapFunctions](dataset_transformations.html#flatmap) and Hadoop Reducers are equivalent to Flink's [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset). Flink provides wrappers for implementations of Hadoop MapReduce's `Mapper` and `Reducer` interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop's mapred API (`org.apache.hadoop.mapred`) are supported.
-
-The wrappers take a `DataSet<Tuple2<KEYIN,VALUEIN>>` as input and produce a `DataSet<Tuple2<KEYOUT,VALUEOUT>>` as output where `KEYIN` and `KEYOUT` are the keys and `VALUEIN` and `VALUEOUT` are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (`HadoopReduceCombineFunction`) and without a Combiner (`HadoopReduceFunction`). The wrappers accept an optional `JobConf` object to configure the Hadoop Mapper or Reducer.
-
-Flink's function wrappers are
-
-- `org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction`,
-- `org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction`, and
-- `org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction`.
-
-and can be used as regular Flink [FlatMapFunctions](dataset_transformations.html#flatmap) or [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset).
-
-The following example shows how to use Hadoop `Mapper` and `Reducer` functions.
-
-~~~java
-// Obtain data to process somehow.
-DataSet<Tuple2<Text, LongWritable>> text = [...]
-
-DataSet<Tuple2<Text, LongWritable>> result = text
-  // use Hadoop Mapper (Tokenizer) as MapFunction
-  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
-    new Tokenizer()
-  ))
-  .groupBy(0)
-  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
-  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
-    new Counter(), new Counter()
-  ));
-~~~
-
-**Please note:** The Reducer wrapper works on groups as defined by Flink's [groupBy()](dataset_transformations.html#transformations-on-grouped-dataset) operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the `JobConf`.
-
-### Complete Hadoop WordCount Example
-
-The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.
-
-~~~java
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-// Set up the Hadoop TextInputFormat.
-Job job = Job.getInstance();
-HadoopInputFormat<LongWritable, Text> hadoopIF =
-  new HadoopInputFormat<LongWritable, Text>(
-    new TextInputFormat(), LongWritable.class, Text.class, job
-  );
-TextInputFormat.addInputPath(job, new Path(inputPath));
-
-// Read data using the Hadoop TextInputFormat.
-DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
-
-DataSet<Tuple2<Text, LongWritable>> result = text
-  // use Hadoop Mapper (Tokenizer) as MapFunction
-  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
-    new Tokenizer()
-  ))
-  .groupBy(0)
-  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
-  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
-    new Counter(), new Counter()
-  ));
-
-// Set up the Hadoop TextOutputFormat.
-HadoopOutputFormat<Text, IntWritable> hadoopOF =
-  new HadoopOutputFormat<Text, IntWritable>(
-    new TextOutputFormat<Text, IntWritable>(), job
-  );
-hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
-TextOutputFormat.setOutputPath(job, new Path(outputPath));
-
-// Emit data using the Hadoop TextOutputFormat.
-result.output(hadoopOF);
-
-// Execute Program
-env.execute("Hadoop WordCount");
-~~~