You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/12/04 10:00:36 UTC

incubator-flink git commit: [FLINK-1157] Document TaskManager slots and minor fixes.

Repository: incubator-flink
Updated Branches:
  refs/heads/master 858d1bccf -> a677c7723


[FLINK-1157] Document TaskManager slots and minor fixes.

- Move some background on the most important configuration values into the configuration guide
- Remove the term "UDF" because it is often being confused with UDFs in SQL
- Fixe some typos


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a677c772
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a677c772
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a677c772

Branch: refs/heads/master
Commit: a677c7723db298343681ccdfb3135373a9b4f7e6
Parents: 858d1bc
Author: Robert Metzger <rm...@apache.org>
Authored: Mon Dec 1 15:38:57 2014 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Dec 4 10:00:05 2014 +0100

----------------------------------------------------------------------
 docs/cluster_execution.md |  2 +-
 docs/cluster_setup.md     | 72 +++++++------------------------
 docs/config.md            | 97 ++++++++++++++++++++++++++++++++++++++----
 docs/setup_quickstart.md  | 27 ++++++++----
 docs/streaming_guide.md   |  2 +-
 5 files changed, 125 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a677c772/docs/cluster_execution.md
----------------------------------------------------------------------
diff --git a/docs/cluster_execution.md b/docs/cluster_execution.md
index d52d3f0..c40838f 100644
--- a/docs/cluster_execution.md
+++ b/docs/cluster_execution.md
@@ -58,7 +58,7 @@ public static void main(String[] args) throws Exception {
 }
 ~~~
 
-Note that the program contains custom UDFs and hence requires a JAR file with
+Note that the program contains custom user code and hence requires a JAR file with
 the classes of the code attached. The constructor of the remote environment
 takes the path(s) to the JAR file(s).
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a677c772/docs/cluster_setup.md
----------------------------------------------------------------------
diff --git a/docs/cluster_setup.md b/docs/cluster_setup.md
index c902bea..b3c6599 100644
--- a/docs/cluster_setup.md
+++ b/docs/cluster_setup.md
@@ -140,14 +140,15 @@ echo "PermitUserEnvironment yes" >> /etc/ssh/sshd_config
 ## Hadoop Distributed Filesystem (HDFS) Setup
 
 The Flink system currently uses the Hadoop Distributed Filesystem (HDFS)
-to read and write data in a distributed fashion.
+to read and write data in a distributed fashion. It is possible to use
+Flink without HDFS or other distributed file systems.
 
 Make sure to have a running HDFS installation. The following instructions are
 just a general overview of some required settings. Please consult one of the
 many installation guides available online for more detailed instructions.
 
-**Note that the following instructions are based on Hadoop 1.2 and might differ
-**for Hadoop 2.
+__Note that the following instructions are based on Hadoop 1.2 and might differ 
+for Hadoop 2.__
 
 ### Downloading, Installing, and Configuring HDFS
 
@@ -297,62 +298,19 @@ The Flink directory must be available on every worker under the same
 path. Similarly as for HDFS, you can use a shared NSF directory, or copy the
 entire Flink directory to every worker node.
 
-### Configuring the Network Buffers
-
-Network buffers are a critical resource for the communication layers. They are
-used to buffer records before transmission over a network, and to buffer
-incoming data before dissecting it into records and handing them to the
-application. A sufficient number of network buffers are critical to achieve a
-good throughput.
-
-In general, configure the task manager to have so many buffers that each logical
-network connection on you expect to be open at the same time has a dedicated
-buffer. A logical network connection exists for each point-to-point exchange of
-data over the network, which typically happens at repartitioning- or
-broadcasting steps. In those, each parallel task inside the TaskManager has to
-be able to talk to all other parallel tasks. Hence, the required number of
-buffers on a task manager is *total-degree-of-parallelism* (number of targets)
-\* *intra-node-parallelism* (number of sources in one task manager) \* *n*.
-Here, *n* is a constant that defines how many repartitioning-/broadcasting steps
-you expect to be active at the same time.
-
-Since the *intra-node-parallelism* is typically the number of cores, and more
-than 4 repartitioning or broadcasting channels are rarely active in parallel, it
-frequently boils down to *\#cores\^2\^* \* *\#machines* \* 4. To support for
-example a cluster of 20 8-core machines, you should use roughly 5000 network
-buffers for optimal throughput.
-
-Each network buffer is by default 64 KiBytes large. In the above example, the
-system would allocate roughly 300 MiBytes for network buffers.
-
-The number and size of network buffers can be configured with the following
-parameters:
-
-- `taskmanager.network.numberOfBuffers`, and
-- `taskmanager.network.bufferSizeInBytes`.
-
-### Configuring Temporary I/O Directories
-
-Although Flink aims to process as much data in main memory as possible,
-it is not uncommon that  more data needs to be processed than memory is
-available. Flink's runtime is designed to  write temporary data to disk
-to handle these situations.
-
-The `taskmanager.tmp.dirs` parameter specifies a list of directories into which
-Flink writes temporary files. The paths of the directories need to be
-separated by ':' (colon character).  Flink will concurrently write (or
-read) one temporary file to (from) each configured directory.  This way,
-temporary I/O can be evenly distributed over multiple independent I/O devices
-such as hard disks to improve performance.  To leverage fast I/O devices (e.g.,
-SSD, RAID, NAS), it is possible to specify a directory multiple times.
-
-If the `taskmanager.tmp.dirs` parameter is not explicitly specified,
-Flink writes temporary data to the temporary  directory of the operating
-system, such as */tmp* in Linux systems.
-
 Please see the [configuration page](config.html) for details and additional
 configuration options.
 
+In particular, 
+
+ * the amount of available memory per TaskManager (`taskmanager.heap.mb`), 
+ * the number of available CPUs per machine (`taskmanager.numberOfTaskSlots`),
+ * the total number of CPUs in the cluster (`parallelization.degree.default`) and 
+ * the temporary directories (`taskmanager.tmp.dirs`)
+
+are very important configuration values.
+
+
 ### Starting Flink
 
 The following script starts a JobManager on the local node and connects via
@@ -366,3 +324,5 @@ Assuming that you are on the master node and inside the Flink directory:
 ~~~bash
 bin/start-cluster.sh
 ~~~
+
+To stop Flink, there is also a `stop-cluster.sh` script.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a677c772/docs/config.md
----------------------------------------------------------------------
diff --git a/docs/config.md b/docs/config.md
index 01fdc23..1c3906e 100644
--- a/docs/config.md
+++ b/docs/config.md
@@ -21,6 +21,9 @@ with format `key: value`.
 The system and run scripts parse the config at startup time. Changes to the configuration
 file require restarting the Flink JobManager and TaskManagers.
 
+The configuration files for the TaskManagers can be different, Flink does not assume 
+uniform machines in the cluster.
+
 
 ## Common Options
 
@@ -35,27 +38,29 @@ master/coordinator of the distributed system (DEFAULT: localhost).
 
 - `jobmanager.rpc.port`: The port number of the JobManager (DEFAULT: 6123).
 
-- `jobmanager.heap.mb`: JVM heap size (in megabytes) for the JobManager
-(DEFAULT: 256).
+- `jobmanager.heap.mb`: JVM heap size (in megabytes) for the JobManager. You may have to increase the heap size for the JobManager if you are running
+very large applications (with many operators), or if you are keeping a long history of them.
 
 - `taskmanager.heap.mb`: JVM heap size (in megabytes) for the TaskManagers,
 which are the parallel workers of the system. In
 contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and
 user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager
 (including sorting/hashing/caching), so this value should be as
-large as possible (DEFAULT: 512). On YARN setups, this value is automatically
-configured to the size of the TaskManager's YARN container, minus a
-certain tolerance value.
+large as possible. If the cluster is exclusively running Flink,
+the total amount of available memory per machine minus some memory for the 
+operating system (maybe 1-2 GB) is a good value.
+On YARN setups, this value is automatically configured to the size of 
+the TaskManager's YARN container, minus a certain tolerance value.
 
 - `taskmanager.numberOfTaskSlots`: The number of parallel operator or
-UDF instances that a single TaskManager can run (DEFAULT: 1).
+user function instances that a single TaskManager can run (DEFAULT: 1).
 If this value is larger than 1, a single TaskManager takes multiple instances of
 a function or operator. That way, the TaskManager can utilize multiple CPU cores,
 but at the same time, the available memory is divided between the different
 operator or function instances.
 This value is typically proportional to the number of physical CPU cores that
 the TaskManager's machine has (e.g., equal to the number of cores, or half the
-number of cores).
+number of cores). [More about task slots](config.html#configuring-taskmanager-processing-slots).
 
 - `parallelization.degree.default`: The default degree of parallelism to use for
 programs that have no degree of parallelism specified. (DEFAULT: 1). For
@@ -157,7 +162,7 @@ large as possible (DEFAULT: 512). On YARN setups, this value is automatically
 configured to the size of the TaskManager's YARN container, minus a
 certain tolerance value.
 - `taskmanager.numberOfTaskSlots`: The number of parallel operator or
-UDF instances that a single TaskManager can run (DEFAULT: 1).
+user function instances that a single TaskManager can run (DEFAULT: 1).
 If this value is larger than 1, a single TaskManager takes multiple instances of
 a function or operator. That way, the TaskManager can utilize multiple CPU cores,
 but at the same time, the available memory is divided between the different
@@ -266,3 +271,79 @@ So if `yarn.am.rpc.port` is configured to `10245` and the session's application
 
 - `yarn.am.rpc.port`: The port that is being opened by the Application Master (AM) to 
 let the YARN client connect for an RPC serice. (DEFAULT: Port 10245)
+
+
+## Background
+
+### Configuring the Network Buffers
+
+Network buffers are a critical resource for the communication layers. They are
+used to buffer records before transmission over a network, and to buffer
+incoming data before dissecting it into records and handing them to the
+application. A sufficient number of network buffers is critical to achieve a
+good throughput.
+
+In general, configure the task manager to have enough buffers that each logical
+network connection on you expect to be open at the same time has a dedicated
+buffer. A logical network connection exists for each point-to-point exchange of
+data over the network, which typically happens at repartitioning- or
+broadcasting steps. In those, each parallel task inside the TaskManager has to
+be able to talk to all other parallel tasks. Hence, the required number of
+buffers on a task manager is *total-degree-of-parallelism* (number of targets)
+\* *intra-node-parallelism* (number of sources in one task manager) \* *n*.
+Here, *n* is a constant that defines how many repartitioning-/broadcasting steps
+you expect to be active at the same time.
+
+Since the *intra-node-parallelism* is typically the number of cores, and more
+than 4 repartitioning or broadcasting channels are rarely active in parallel, it
+frequently boils down to *\#cores\^2\^* \* *\#machines* \* 4. To support for
+example a cluster of 20 8-core machines, you should use roughly 5000 network
+buffers for optimal throughput.
+
+Each network buffer has by default a size of 32 KiBytes. In the above example, the
+system would allocate roughly 300 MiBytes for network buffers.
+
+The number and size of network buffers can be configured with the following
+parameters:
+
+- `taskmanager.network.numberOfBuffers`, and
+- `taskmanager.network.bufferSizeInBytes`.
+
+### Configuring Temporary I/O Directories
+
+Although Flink aims to process as much data in main memory as possible,
+it is not uncommon that more data needs to be processed than memory is
+available. Flink's runtime is designed to write temporary data to disk
+to handle these situations.
+
+The `taskmanager.tmp.dirs` parameter specifies a list of directories into which
+Flink writes temporary files. The paths of the directories need to be
+separated by ':' (colon character). Flink will concurrently write (or
+read) one temporary file to (from) each configured directory. This way,
+temporary I/O can be evenly distributed over multiple independent I/O devices
+such as hard disks to improve performance. To leverage fast I/O devices (e.g.,
+SSD, RAID, NAS), it is possible to specify a directory multiple times.
+
+If the `taskmanager.tmp.dirs` parameter is not explicitly specified,
+Flink writes temporary data to the temporary directory of the operating
+system, such as */tmp* in Linux systems.
+
+
+### Configuring TaskManager processing slots
+
+A processing slot allows Flink to execute a distributed DataSet transformation, such as a
+data source or a map-transformation.
+
+Each Flink TaskManager provides processing slots in the cluster. The number of slots
+is typically proportional to the number of available CPU cores __of each__ TaskManager.
+As a general recommendation, the number of available CPU cores is a good default for 
+`taskmanager.numberOfTaskSlots`.
+
+When starting a Flink application, users can supply the default number of slots to use for that job.
+The command line value therefore is called `-p` (for parallelism). In addition, it is possible
+to [set the number of slots in the programming APIs](programming_guide.html#parallel-execution) for 
+the whole application and individual operators.
+
+Flink is currently scheduling an application to slots by "filling" them up. 
+If the cluster has 20 machines with 2 slots each (40 slots in total) but the application is running
+with a parallelism of 20, only 10 machines will process data.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a677c772/docs/setup_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/setup_quickstart.md b/docs/setup_quickstart.md
index c2a30cc..61128c3 100644
--- a/docs/setup_quickstart.md
+++ b/docs/setup_quickstart.md
@@ -30,7 +30,6 @@ Download the ready to run binary package. Choose the Flink distribution that __m
 
 
 ## Start
-You are almost done.
   
 1. Go to the download directory.
 2. Unpack the downloaded archive.
@@ -38,10 +37,10 @@ You are almost done.
 
 
 ~~~bash
-$ cd ~/Downloads              # Go to download directory
-$ tar xzf flink-*.tgz  # Unpack the downloaded archive
-$ cd flink
-$ bin/start-local.sh          # Start Flink
+$ cd ~/Downloads        # Go to download directory
+$ tar xzf flink-*.tgz   # Unpack the downloaded archive
+$ cd flink-{{site.FLINK_VERSION_STABLE}}
+$ bin/start-local.sh    # Start Flink
 ~~~
 
 Check the __JobManager's web frontend__ at [http://localhost:8081](http://localhost:8081) and make
@@ -61,10 +60,7 @@ Run the __Word Count example__ to see Flink at work.
 * __Start the example program__:
   
   ~~~bash
-  $ bin/flink run \
-    --jarfile ./examples/flink-java-examples-{{site.FLINK_VERSION_STABLE}}-WordCount.jar \
-
-    --arguments file://`pwd`/hamlet.txt file://`pwd`/wordcount-result.txt
+  $ bin/flink run ./examples/flink-java-examples-{{site.FLINK_VERSION_STABLE}}-WordCount.jar file://`pwd`/hamlet.txt file://`pwd`/wordcount-result.txt
   ~~~
 
 * You will find a file called __wordcount-result.txt__ in your current directory.
@@ -113,6 +109,19 @@ configuration files, which need to be accessible at the same path on all machine
 </div>
 </div>
 
+Have a look at the [Configuration](config.html) section of the documentation to see other available configuration options.
+For Flink to run efficiently, a few configuration values need to be set.
+
+In particular, 
+
+ * the amount of available memory per TaskManager (`taskmanager.heap.mb`), 
+ * the number of available CPUs per machine (`taskmanager.numberOfTaskSlots`),
+ * the total number of CPUs in the cluster (`parallelization.degree.default`) and 
+ * the temporary directories (`taskmanager.tmp.dirs`)
+
+
+are very important configuration values.
+
 ## Flink on YARN
 You can easily deploy Flink on your existing __YARN cluster__. 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a677c772/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 1c6cb2e..264140a 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -149,7 +149,7 @@ Usage: `operator.setParallelism(1)`
 
 ### Sources
 
-The user can connect to data streams by the different implemenations of `DataStreamSource` using methods provided by the `StreamExecutionEnvironment`. There are several predefined ones similar to the ones of the batch API and some streaming specific ones like:
+The user can connect to data streams by the different implementations of `DataStreamSource` using methods provided by the `StreamExecutionEnvironment`. There are several predefined ones similar to the ones of the batch API and some streaming specific ones like:
 
  * `socketTextStream(hostname, port)`
  * `readTextStream(filepath)`