You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2015/07/25 03:01:36 UTC
incubator-mrql git commit: [MRQL-77] Support single Flink jobs on YARN
Repository: incubator-mrql
Updated Branches:
refs/heads/master 4c1fbcf74 -> 05feb9f65
[MRQL-77] Support single Flink jobs on YARN
Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/05feb9f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/05feb9f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/05feb9f6
Branch: refs/heads/master
Commit: 05feb9f6520335828e246ba1f07ca82e7300e10c
Parents: 4c1fbcf
Author: fegaras <fe...@cse.uta.edu>
Authored: Fri Jul 24 17:53:31 2015 -0500
Committer: fegaras <fe...@cse.uta.edu>
Committed: Fri Jul 24 17:53:31 2015 -0500
----------------------------------------------------------------------
bin/mrql.flink | 40 +++++++++++++------
conf/mrql-env.sh | 13 +++---
.../java/org/apache/mrql/FlinkEvaluator.gen | 42 +++++++++++++-------
3 files changed, 60 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/05feb9f6/bin/mrql.flink
----------------------------------------------------------------------
diff --git a/bin/mrql.flink b/bin/mrql.flink
index f3b4b37..135a9de 100755
--- a/bin/mrql.flink
+++ b/bin/mrql.flink
@@ -21,7 +21,6 @@
#--------------------------------------------------------------------------------
#
# run Apache MRQL in Apache Flink mode
-# (need to run $FLINK_HOME/bin/yarn-session.sh on a separate window first)
#
#--------------------------------------------------------------------------------
@@ -35,7 +34,7 @@ MRQL_JAR=`ls "$MRQL_HOME"/lib/mrql-flink-*.jar`
FULL_JAR="/tmp/${USER}_mrql_flink.jar"
CLASS_DIR="/tmp/${USER}_mrql_classes"
-export FLINK_HOME FLINK_JARS FLINK_MASTER FS_DEFAULT_NAME
+export FLINK_HOME FLINK_JARS FS_DEFAULT_NAME
if [[ ($MRQL_JAR -nt $FULL_JAR) ]]; then
rm -rf $CLASS_DIR
@@ -46,24 +45,41 @@ if [[ ($MRQL_JAR -nt $FULL_JAR) ]]; then
$JAVA_HOME/bin/jar xf $GEN_JAR
$JAVA_HOME/bin/jar xf $CORE_JAR
$JAVA_HOME/bin/jar xf $MRQL_JAR
- $JAVA_HOME/bin/jar xf $HDFS_JAR
cd ..
$JAVA_HOME/bin/jar cf $FULL_JAR -C $CLASS_DIR .
popd > /dev/null
fi
if [ "$1" == "-dist" ]; then
- if ([ -a $FLINK_HOME/conf/.yarn-jobmanager ]); then
- export FLINK_MASTER=`cat $FLINK_HOME/conf/.yarn-jobmanager`
- elif ([ -a $FLINK_HOME/conf/.yarn-properties ]); then
+ args=" -flink $*"
+ args=${args// /\!}
+ if ([ -a $FLINK_HOME/conf/.yarn-properties ]); then
. $FLINK_HOME/conf/.yarn-properties
- export FLINK_MASTER=$jobManager
+ host=`echo $jobManager | cut -d : -f 1`
+ port=`echo $jobManager | cut -d : -f 2`
+ fi
+ if ! type nc > /dev/null; then
+ jobManager=
+ elif [ "$host" == "" ] || [ "$port" == "" ] || [ "`nc -z $host $port`" == "" ]; then
+ # there is no Flink cluster running (host:port is closed)
+ jobManager=
+ fi
+ export FLINK_MASTER=$jobManager
+ if [ "$jobManager" == "" ]; then
+ # there is no Flink cluster running on YARN, so run this as a single job on YARN
+ yn=$FLINK_SLOTS
+ ARGS=($*)
+ for (( i = 0; i < $#; i++ )); do
+ if [ "${ARGS[i]}" = "-nodes" ]; then
+ yn=$(( ${ARGS[i+1]} / $FLINK_SLOTS ))
+ fi
+ done
+ $FLINK_HOME/bin/flink run -m yarn-cluster -yn $yn -ys $FLINK_SLOTS -ytm $FLINK_TASK_MANAGER_MEMORY -c org.apache.mrql.Main $FULL_JAR args $args | grep -v " switched to "
else
- export FLINK_MASTER=
+ # a long-running Flink cluster has already been started on YARN
+ # (using $FLINK_HOME/bin/yarn-session.sh on a separate window)
+ $FLINK_HOME/bin/flink run -c org.apache.mrql.Main $FULL_JAR args $args | grep -v " switched to "
fi
- args=" -flink $*"
- args=${args// /\!}
- $FLINK_HOME/bin/flink run -c org.apache.mrql.Main $FULL_JAR args $args | grep -v " switched to "
else
- $JAVA_HOME/bin/java -classpath "$FULL_JAR:$FLINK_JARS" org.apache.mrql.Main -flink $* | grep -v " switched to "
+ $JAVA_HOME/bin/java -classpath "$FULL_JAR:$FLINK_JARS" org.apache.mrql.Main -flink $* | grep -v " switched to "
fi
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/05feb9f6/conf/mrql-env.sh
----------------------------------------------------------------------
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index 90e453e..57505c5 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -94,18 +94,15 @@ SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=1G
-# Optional: Flink configuration. Supports version 0.9.0 only
-# (Flink versions 0.6-incubating, 0.6.1-incubating, 0.7.0-incubating, 0.8.0, and 0.8.1 are supported by MRQL 0.9.2)
+# Optional: Flink configuration. Supports version 0.6-incubating
# Note: for yarn, set yarn.nodemanager.vmem-check-enabled to false in yarn-site.xml
FLINK_VERSION=0.9.0
# Flink installation directory
FLINK_HOME=${HOME}/flink-${FLINK_VERSION}
-# Hadoop HDFS: needed for Sequence files in Flink mode
-HDFS_JAR=${HADOOP_HOME}/share/hadoop/hdfs/hadoop-hdfs-${HADOOP_VERSION}.jar
-# Flink JobManager (it is derived automatically on a yarn cluster)
-if [ "$FLINK_MASTER" = "" ]; then
- FLINK_MASTER=`hostname`:6123
-fi
+# number of slots per TaskManager (typically, the number of CPUs per machine)
+FLINK_SLOTS=4
+# memory per TaskManager
+FLINK_TASK_MANAGER_MEMORY=2048
# Claspaths
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/05feb9f6/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
index 30f9a2e..e3f6e9d 100644
--- a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
+++ b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
@@ -20,6 +20,7 @@ package org.apache.mrql;
import java.util.List;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Arrays;
import java.io.*;
import java.net.URI;
import java.net.URL;
@@ -39,6 +40,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.functions.*;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.client.program.ContextEnvironment;
/** Evaluates physical plans in Apache Flink mode */
@@ -48,7 +50,7 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
public static StreamExecutionEnvironment stream_env;
// an HDFS tmp file used to hold the data source directory information in distributed mode
static String data_source_dir_name;
- static String master_host = "localhost";
+ static String master_host = "";
static int master_port = 6123;
static String fs_default_name;
@@ -70,11 +72,13 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
String master_node = System.getenv("FLINK_MASTER");
if (master_node == null)
throw new Error("Need to run the Flink application master first: $FLINK_HOME/bin/yarn-session.sh");
- String[] m = master_node.split(":");
- if (m.length != 2)
- throw new Error("Need both host name and port number for the Flink application master: "+master_node);
- master_host = m[0];
- master_port = Integer.parseInt(m[1]);
+ if (!master_node.equals("")) {
+ String[] m = master_node.split(":");
+ if (m.length != 2)
+ throw new Error("Need both host name and port number for the Flink application master: "+master_node);
+ master_host = m[0];
+ master_port = Integer.parseInt(m[1]);
+ };
fs_default_name = System.getenv("FS_DEFAULT_NAME");
data_source_dir_name = absolute_path("tmp/data_source_dir.txt");
Plan.conf.set("fs.default.name",fs_default_name);
@@ -88,18 +92,26 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
final public void initialize_query () {
try {
if (!Config.local_mode) {
- Plan.distribute_compiled_arguments(Plan.conf);
- if (Config.compile_functional_arguments)
- flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port,
- Config.nodes,flink_jar.toURI().getPath(),Plan.conf.get("mrql.jar.path"));
+ if (Config.compile_functional_arguments) {
+ Plan.distribute_compiled_arguments(Plan.conf);
+ if (master_host.equals("")) {
+ ContextEnvironment ce = (ContextEnvironment)ExecutionEnvironment.getExecutionEnvironment();
+ List<File> jars = Arrays.asList(new File(flink_jar.toURI().getPath()),
+ new File(Plan.conf.get("mrql.jar.path")));
+ flink_env = new ContextEnvironment(ce.getClient(),jars,FlinkEvaluator.class.getClassLoader(),true);
+ flink_env.setParallelism(Config.nodes);
+ } else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port,
+ Config.nodes,flink_jar.toURI().getPath(),Plan.conf.get("mrql.jar.path"));
+ } else if (master_host.equals(""))
+ flink_env = ExecutionEnvironment.getExecutionEnvironment();
else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port,
- Config.nodes,flink_jar.toURI().getPath());
- } else {
- flink_env.setDefaultLocalParallelism(Config.nodes);
+ Config.nodes,flink_jar.toURI().getPath());
+ } else {
+ flink_env.setDefaultLocalParallelism(Config.nodes);
if (Config.stream_window > 0)
stream_env = StreamExecutionEnvironment.createLocalEnvironment(Config.nodes)
- .setBufferTimeout(Config.stream_window);
- }
+ .setBufferTimeout(Config.stream_window);
+ };
} catch (Exception ex) {
throw new Error("Cannot initialize the Flink evaluator: "+ex);
}