You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/11/13 10:08:45 UTC

incubator-flink git commit: [FLINK-1230] Add documentation and an example for collection-based execution

Repository: incubator-flink
Updated Branches:
  refs/heads/master b3c290f68 -> b253cb2de


[FLINK-1230] Add documentation and an example for collection-based execution

This closes #195


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b253cb2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b253cb2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b253cb2d

Branch: refs/heads/master
Commit: b253cb2dec3fbf71782528af22d87f3128a1ffbb
Parents: b3c290f
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Nov 11 15:49:01 2014 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Nov 13 10:07:55 2014 +0100

----------------------------------------------------------------------
 docs/local_execution.md                         |  79 +++++++-------
 .../java/misc/CollectionExecutionExample.java   | 103 +++++++++++++++++++
 2 files changed, 138 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b253cb2d/docs/local_execution.md
----------------------------------------------------------------------
diff --git a/docs/local_execution.md b/docs/local_execution.md
index 48d9ae7..e0aa22a 100644
--- a/docs/local_execution.md
+++ b/docs/local_execution.md
@@ -2,20 +2,22 @@
 title:  "Local Execution"
 ---
 
-## Local Execution/Debugging
+## Local Execution
 
 Flink can run on a single machine, even in a single Java Virtual Machine. This allows users to test and debug Flink programs locally. This section gives an overview of the local execution mechanisms.
 
-**NOTE:** Please also refer to the [debugging section](java_api_guide.html#debugging) in the Java API documentation for a guide to testing and local debugging utilities in the Java API.
+The local environments and executors allow you to run Flink programs in a local Java Virtual Machine, or with within any JVM as part of existing programs. Most examples can be launched locally by simply hitting the "Run" button of your IDE.
 
-The local environments and executors allow you to run Flink programs in local Java Virtual Machine, or with within any JVM as part of existing programs. Most examples can be launched locally by simply hitting the "Run" button of your IDE.
 
-If you are running Flink programs locally, you can also debug your program like any other Java program. You can either use `System.out.println()` to write out some internal variables or you can use the debugger. It is possible to set breakpoints within `map()`, `reduce()` and all the other methods.
+There are two different kinds of local execution supported in Flink. The `LocalExecutionEnvironment` is starting the full Flink runtime, including a JobManager and a TaskManager. These include memory management and all the internal algorithms that are executed in the cluster mode.
 
-The `JobExecutionResult` object, which is returned after the execution finished, contains the program runtime and the accumulator results.
+The `CollectionEnvironment` is executing the Flink program on Java collections. This mode will not start the full Flink runtime, so the execution is very low-overhead and lightweight. For example a `DataSet.map()`-transformation will be executed by applying the `map()` function to all elements in a Java list.
 
-*Note:* The local execution environments do not start any web frontend to monitor the execution.
 
+## Debugging
+
+If you are running Flink programs locally, you can also debug your program like any other Java program. You can either use `System.out.println()` to write out some internal variables or you can use the debugger. It is possible to set breakpoints within `map()`, `reduce()` and all the other methods.
+Please also refer to the [debugging section](programming_guide.html#debugging) in the Java API documentation for a guide to testing and local debugging utilities in the Java API.
 
 ## Maven Dependency
 
@@ -51,56 +53,45 @@ public static void main(String[] args) throws Exception {
         })
         .writeAsText("file:///path/to/result");
 
-    env.execute();
+    JobExecutionResult res = env.execute();
 }
 ~~~
 
+The `JobExecutionResult` object, which is returned after the execution finished, contains the program runtime and the accumulator results.
 
-## Local Executor
-
-The *LocalExecutor* is similar to the local environment, but it takes a *Plan* object, which describes the program as a single executable unit. The *LocalExecutor* is typically used with the Scala API. 
-
-The following code shows how you would use the `LocalExecutor` with the Wordcount example for Scala Programs:
+*Note:* The local execution environments do not start any web frontend to monitor the execution.
 
-~~~scala
-public static void main(String[] args) throws Exception {
-    val input = TextFile("hdfs://path/to/file")
+## Collection Environment
 
-    val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } }
-    val counts = words groupBy { x => x } count()
+The execution on Java Collections using the `CollectionEnvironment` is a low-overhead approach for executing Flink programs. Typical use-cases for this mode are automated tests, debugging and code re-use.
 
-    val output = counts.write(wordsOutput, CsvOutputFormat())
-  
-    val plan = new ScalaPlan(Seq(output), "Word Count")
-    LocalExecutor.executePlan(p);
-}
-~~~
+Users can use algorithms implemented for batch processing also for cases that are more interactive. A slightly changed variant of a Flink program could be used in a Java Application Server for processing incoming requests.
 
-
-## LocalDistributedExecutor
-
-Flink also offers a `LocalDistributedExecutor` which starts multiple TaskManagers within one JVM. The standard `LocalExecutor` starts one JobManager and one TaskManager in one JVM.
-With the `LocalDistributedExecutor` you can define the number of TaskManagers to start. This is useful for debugging network related code and more of a developer tool than a user tool.
+**Skeleton for Collection-based execution**
 
 ~~~java
 public static void main(String[] args) throws Exception {
-    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-    DataSet<String> data = env.readTextFile("hdfs://path/to/file");
-
-    data
-        .filter(new FilterFunction<String>() {
-            public boolean filter(String value) {
-                return value.startsWith("http://");
-            }
-        })
-        .writeAsText("hdfs://path/to/result");
-
-    Plan p = env.createProgramPlan();
-    LocalDistributedExecutor lde = new LocalDistributedExecutor();
-    lde.startNephele(2); // start two TaskManagers
-    lde.run(p);
+    // initialize a new Collection-based execution environment
+    final ExecutionEnvironment env = new CollectionEnvironment();
+    
+    DataSet<User> users = env.fromCollection( /* get elements from a Java Collection */);
+
+    /* Data Set transformations ... */
+
+    // retrieve the resulting Tuple2 elements into a ArrayList.
+    Collection<...> result = new ArrayList<...>();
+    resultDataSet.output(new LocalCollectionOutputFormat<...>(result));
+    
+    // kick off execution.
+    env.execute();
+    
+    // Do some work with the resulting ArrayList (=Collection).
+    for(... t : result) {
+        System.err.println("Result = "+t);
+    }
 }
 ~~~
 
+The `flink-java-examples` module contains a full example, called `CollectionExecutionExample`.
 
+Please note that the execution of the collection-based Flink programs is only possible on small data, which fits into the JVM heap. The execution on collections is not multi-threaded, only one thread is used.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b253cb2d/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
new file mode 100644
index 0000000..ff1a413
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.misc;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/** 
+ * This example shows how to use the collection based execution of Flink.
+ * 
+ * The collection based execution is a local mode that is not using the full Flink runtime.
+ * DataSet transformations are executed on Java collections.
+ * 
+ * See the "Local Execution" section in the documentation for more details: 
+ * 	http://flink.incubator.apache.org/docs/0.7-incubating/local_execution.html
+ * 
+ */
+public class CollectionExecutionExample {
+	
+	/**
+	 * POJO class representing a user
+	 */
+	public static class User {
+		public int userIdentifier;
+		public String name;
+		public User() {}
+		public User(int userIdentifier, String name) {
+			this.userIdentifier = userIdentifier; this.name = name;
+		}
+		public String toString() {
+			return "User{userIdentifier="+userIdentifier+" name="+name+"}";
+		}
+	}
+	
+	/**
+	 * POJO for an EMail.
+	 */
+	public static class EMail {
+		public int userId;
+		public String subject;
+		public String body;
+		public EMail() {}
+		public EMail(int userId, String subject, String body) {
+			this.userId = userId; this.subject = subject; this.body = body;
+		}
+		public String toString() {
+			return "eMail{userId="+userId+" subject="+subject+" body="+body+"}";
+		}
+		
+	}
+	public static void main(String[] args) throws Exception {
+		// initialize a new Collection-based execution environment
+		final ExecutionEnvironment env = new CollectionEnvironment();
+		
+		// create objects for users and emails
+		User[] usersArray = { new User(1, "Peter"), new User(2, "John"), new User(3, "Bill") };
+		EMail[] emailsArray = {new EMail(1, "Re: Meeting", "How about 1pm?"),
+							new EMail(1, "Re: Meeting", "Sorry, I'm not availble"),
+							new EMail(3, "Re: Re: Project proposal", "Give me a few more days to think about it.")};
+		
+		// convert objects into a DataSet
+		DataSet<User> users = env.fromCollection(Arrays.asList(usersArray));
+		DataSet<EMail> emails = env.fromCollection(Arrays.asList(emailsArray));
+		
+		// join the two DataSets
+		DataSet<Tuple2<User,EMail>> joined = users.join(emails).where("userIdentifier").equalTo("userId");
+		
+		// retrieve the resulting Tuple2 elements into a ArrayList.
+		Collection<Tuple2<User,EMail>> result = new ArrayList<Tuple2<User,EMail>>(3);
+		joined.output(new LocalCollectionOutputFormat<Tuple2<User,EMail>>(result));
+		
+		// kick off execution.
+		env.execute();
+		
+		// Do some work with the resulting ArrayList (=Collection).
+		for(Tuple2<User, EMail> t : result) {
+			System.err.println("Result = "+t);
+		}
+	}
+}