You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2015/07/25 03:01:36 UTC

incubator-mrql git commit: [MRQL-77] Support single Flink jobs on YARN

Repository: incubator-mrql
Updated Branches:
  refs/heads/master 4c1fbcf74 -> 05feb9f65


[MRQL-77] Support single Flink jobs on YARN


Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/05feb9f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/05feb9f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/05feb9f6

Branch: refs/heads/master
Commit: 05feb9f6520335828e246ba1f07ca82e7300e10c
Parents: 4c1fbcf
Author: fegaras <fe...@cse.uta.edu>
Authored: Fri Jul 24 17:53:31 2015 -0500
Committer: fegaras <fe...@cse.uta.edu>
Committed: Fri Jul 24 17:53:31 2015 -0500

----------------------------------------------------------------------
 bin/mrql.flink                                  | 40 +++++++++++++------
 conf/mrql-env.sh                                | 13 +++---
 .../java/org/apache/mrql/FlinkEvaluator.gen     | 42 +++++++++++++-------
 3 files changed, 60 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/05feb9f6/bin/mrql.flink
----------------------------------------------------------------------
diff --git a/bin/mrql.flink b/bin/mrql.flink
index f3b4b37..135a9de 100755
--- a/bin/mrql.flink
+++ b/bin/mrql.flink
@@ -21,7 +21,6 @@
 #--------------------------------------------------------------------------------
 #
 # run Apache MRQL in Apache Flink mode
-#  (need to run $FLINK_HOME/bin/yarn-session.sh on a separate window first)
 #
 #--------------------------------------------------------------------------------
 
@@ -35,7 +34,7 @@ MRQL_JAR=`ls "$MRQL_HOME"/lib/mrql-flink-*.jar`
 FULL_JAR="/tmp/${USER}_mrql_flink.jar"
 CLASS_DIR="/tmp/${USER}_mrql_classes"
 
-export FLINK_HOME FLINK_JARS FLINK_MASTER FS_DEFAULT_NAME
+export FLINK_HOME FLINK_JARS FS_DEFAULT_NAME
 
 if [[ ($MRQL_JAR -nt $FULL_JAR) ]]; then
    rm -rf $CLASS_DIR
@@ -46,24 +45,41 @@ if [[ ($MRQL_JAR -nt $FULL_JAR) ]]; then
    $JAVA_HOME/bin/jar xf $GEN_JAR
    $JAVA_HOME/bin/jar xf $CORE_JAR
    $JAVA_HOME/bin/jar xf $MRQL_JAR
-   $JAVA_HOME/bin/jar xf $HDFS_JAR
    cd ..
    $JAVA_HOME/bin/jar cf $FULL_JAR -C $CLASS_DIR .
    popd > /dev/null
 fi
 
 if [ "$1" == "-dist" ]; then
-    if ([ -a $FLINK_HOME/conf/.yarn-jobmanager ]); then
-	export FLINK_MASTER=`cat $FLINK_HOME/conf/.yarn-jobmanager`
-    elif ([ -a $FLINK_HOME/conf/.yarn-properties ]); then
+    args=" -flink $*"
+    args=${args// /\!}
+    if ([ -a $FLINK_HOME/conf/.yarn-properties ]); then
 	. $FLINK_HOME/conf/.yarn-properties
-	export FLINK_MASTER=$jobManager
+	host=`echo $jobManager | cut -d : -f 1`
+	port=`echo $jobManager | cut -d : -f 2`
+    fi
+    if ! type nc > /dev/null; then
+	jobManager=
+    elif [ "$host" == "" ] || [ "$port" == "" ] || [ "`nc -z $host $port`" == "" ]; then
+	# there is no Flink cluster running (host:port is closed)
+	jobManager=
+    fi
+    export FLINK_MASTER=$jobManager
+    if [ "$jobManager" == "" ]; then
+	# there is no Flink cluster running on YARN, so run this as a single job on YARN
+	yn=$FLINK_SLOTS
+	ARGS=($*)
+	for (( i = 0; i < $#; i++ )); do
+            if [ "${ARGS[i]}" = "-nodes" ]; then
+		yn=$(( ${ARGS[i+1]} / $FLINK_SLOTS ))
+            fi
+	done
+	$FLINK_HOME/bin/flink run -m yarn-cluster -yn $yn -ys $FLINK_SLOTS -ytm $FLINK_TASK_MANAGER_MEMORY -c org.apache.mrql.Main $FULL_JAR args $args | grep -v " switched to "
     else
-	export FLINK_MASTER=
+	# a long-running Flink cluster has already been started on YARN
+        #  (using $FLINK_HOME/bin/yarn-session.sh on a separate window)
+	$FLINK_HOME/bin/flink run -c org.apache.mrql.Main $FULL_JAR args $args | grep -v " switched to "
     fi
-    args=" -flink $*"
-    args=${args// /\!}
-    $FLINK_HOME/bin/flink run -c org.apache.mrql.Main $FULL_JAR args $args | grep -v " switched to "
 else 
-    $JAVA_HOME/bin/java -classpath "$FULL_JAR:$FLINK_JARS" org.apache.mrql.Main -flink $*  | grep -v " switched to "
+    $JAVA_HOME/bin/java -classpath "$FULL_JAR:$FLINK_JARS" org.apache.mrql.Main -flink $* | grep -v " switched to "
 fi

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/05feb9f6/conf/mrql-env.sh
----------------------------------------------------------------------
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index 90e453e..57505c5 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -94,18 +94,15 @@ SPARK_WORKER_CORES=1
 SPARK_WORKER_MEMORY=1G
 
 
-# Optional: Flink configuration. Supports version 0.9.0 only
-# (Flink versions 0.6-incubating, 0.6.1-incubating, 0.7.0-incubating, 0.8.0, and 0.8.1 are supported by MRQL 0.9.2)
+# Optional: Flink configuration. Supports version 0.6-incubating
 # Note: for yarn, set yarn.nodemanager.vmem-check-enabled to false in yarn-site.xml
 FLINK_VERSION=0.9.0
 # Flink installation directory
 FLINK_HOME=${HOME}/flink-${FLINK_VERSION}
-# Hadoop HDFS: needed for Sequence files in Flink mode
-HDFS_JAR=${HADOOP_HOME}/share/hadoop/hdfs/hadoop-hdfs-${HADOOP_VERSION}.jar
-# Flink JobManager (it is derived automatically on a yarn cluster)
-if [ "$FLINK_MASTER" = "" ]; then
-    FLINK_MASTER=`hostname`:6123
-fi
+# number of slots per TaskManager (typically, the number of CPUs per machine)
+FLINK_SLOTS=4
+# memory per TaskManager
+FLINK_TASK_MANAGER_MEMORY=2048
 
 
 # Claspaths

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/05feb9f6/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
index 30f9a2e..e3f6e9d 100644
--- a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
+++ b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
@@ -20,6 +20,7 @@ package org.apache.mrql;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Arrays;
 import java.io.*;
 import java.net.URI;
 import java.net.URL;
@@ -39,6 +40,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.functions.*;
 import org.apache.flink.api.java.operators.*;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.client.program.ContextEnvironment;
 
 
 /** Evaluates physical plans in Apache Flink mode */
@@ -48,7 +50,7 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
     public static StreamExecutionEnvironment stream_env;
     // an HDFS tmp file used to hold the data source directory information in distributed mode
     static String data_source_dir_name;
-    static String master_host = "localhost";
+    static String master_host = "";
     static int master_port = 6123;
     static String fs_default_name;
 
@@ -70,11 +72,13 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
             String master_node = System.getenv("FLINK_MASTER");
             if (master_node == null)
                 throw new Error("Need to run the Flink application master first: $FLINK_HOME/bin/yarn-session.sh");
-            String[] m = master_node.split(":");
-            if (m.length != 2)
-                throw new Error("Need both host name and port number for the Flink application master: "+master_node);
-            master_host = m[0];
-            master_port = Integer.parseInt(m[1]);
+            if (!master_node.equals("")) {
+		String[] m = master_node.split(":");
+		if (m.length != 2)
+		    throw new Error("Need both host name and port number for the Flink application master: "+master_node);
+		master_host = m[0];
+		master_port = Integer.parseInt(m[1]);
+	    };
             fs_default_name = System.getenv("FS_DEFAULT_NAME");
             data_source_dir_name = absolute_path("tmp/data_source_dir.txt");
 	    Plan.conf.set("fs.default.name",fs_default_name);
@@ -88,18 +92,26 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
     final public void initialize_query () {
         try {
             if (!Config.local_mode) {
-                Plan.distribute_compiled_arguments(Plan.conf);
-                if (Config.compile_functional_arguments)
-                    flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port,
-                                        Config.nodes,flink_jar.toURI().getPath(),Plan.conf.get("mrql.jar.path"));
+		if (Config.compile_functional_arguments) {
+		    Plan.distribute_compiled_arguments(Plan.conf);
+		    if (master_host.equals("")) {
+			ContextEnvironment ce = (ContextEnvironment)ExecutionEnvironment.getExecutionEnvironment();
+			List<File> jars = Arrays.asList(new File(flink_jar.toURI().getPath()),
+							new File(Plan.conf.get("mrql.jar.path")));
+			flink_env = new ContextEnvironment(ce.getClient(),jars,FlinkEvaluator.class.getClassLoader(),true);
+			flink_env.setParallelism(Config.nodes);
+		    } else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port,
+					     Config.nodes,flink_jar.toURI().getPath(),Plan.conf.get("mrql.jar.path"));
+		} else if (master_host.equals(""))
+		    flink_env = ExecutionEnvironment.getExecutionEnvironment();
                 else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port,
-                                        Config.nodes,flink_jar.toURI().getPath());
-            } else {
-                flink_env.setDefaultLocalParallelism(Config.nodes);
+					     Config.nodes,flink_jar.toURI().getPath());
+	    } else {
+		flink_env.setDefaultLocalParallelism(Config.nodes);
                 if (Config.stream_window > 0)
                     stream_env = StreamExecutionEnvironment.createLocalEnvironment(Config.nodes)
-                                          .setBufferTimeout(Config.stream_window);
-            }
+			               .setBufferTimeout(Config.stream_window);
+            };
         } catch (Exception ex) {
             throw new Error("Cannot initialize the Flink evaluator: "+ex);
         }