You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:45:53 UTC

[57/82] [abbrv] incubator-flink git commit: Removed JobStatusListener and ExecutionListener. Fixed LocalExecutor output for maven verify.

Removed JobStatusListener and ExecutionListener. Fixed LocalExecutor output for maven verify.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/8eadd3ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/8eadd3ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/8eadd3ec

Branch: refs/heads/master
Commit: 8eadd3ec2e07aa5eec6daccc6849b1038bbeae5f
Parents: c175ebe
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 17 17:06:13 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:31 2014 +0100

----------------------------------------------------------------------
 .../flink/streaming/util/ClusterUtil.java       |    1 -
 .../main/java/org/apache/flink/yarn/Client.java |    9 +-
 .../apache/flink/yarn/ApplicationClient.scala   |    4 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |    2 +-
 .../org/apache/flink/client/LocalExecutor.java  |    8 +-
 .../src/main/flink-bin/bin/start-cluster.sh     |    2 +-
 .../src/main/flink-bin/bin/start-local.sh       |    2 +-
 .../src/main/flink-bin/bin/stop-cluster.sh      |    2 +-
 flink-dist/src/main/flink-bin/bin/stop-local.sh |    2 +-
 .../web-docs-infoserver/js/jquery-2.1.0.js      | 9111 ------------------
 .../apache/flink/runtime/blob/BlobClient.java   |    6 +-
 .../apache/flink/runtime/blob/BlobServer.java   |   18 +-
 .../runtime/execution/ExecutionListener.java    |   33 -
 .../runtime/executiongraph/ExecutionGraph.java  |   40 -
 .../executiongraph/JobStatusListener.java       |   36 -
 .../runtime/profiling/TaskManagerProfiler.java  |   13 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   31 +-
 .../web-docs-infoserver/js/jquery-1.10.2.js     |    6 -
 .../web-docs-infoserver/js/jquery-2.1.0.js      | 9111 ++++++++++++++++++
 .../flink/runtime/taskmanager/TaskManager.scala |    6 +-
 .../clients/examples/LocalExecutorITCase.java   |    1 +
 ...terationNotDependingOnSolutionSetITCase.java |   44 +-
 22 files changed, 9183 insertions(+), 9305 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index a7b7137..f75db68 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.util;
 
 import java.net.InetSocketAddress;
 
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
index b7c8e51..1de61a8 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.jar.JarFile;
 
 import akka.actor.ActorRef;
@@ -71,6 +72,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Records;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * All classes in this package contain code taken from
@@ -328,6 +330,9 @@ public class Client {
 			LOG.warn("Unable to find job manager port in configuration!");
 			jmPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
 		}
+		FiniteDuration timeout = new FiniteDuration(GlobalConfiguration.getInteger
+				(ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT),
+				TimeUnit.SECONDS);
 
 		conf = Utils.initializeYarnConfiguration();
 
@@ -520,7 +525,6 @@ public class Client {
 		// file that we write into the conf/ dir containing the jobManager address and the dop.
 		yarnPropertiesFile = new File(confDirPath + CliFrontend.YARN_PROPERTIES_FILE);
 
-
 		LOG.info("Submitting application master " + appId);
 		yarnClient.submitApplication(appContext);
 
@@ -533,7 +537,8 @@ public class Client {
 		// start application client
 		LOG.info("Start application client.");
 		applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, appId, jmPort,
-				yarnClient, confDirPath, slots, taskManagerCount, dynamicPropertiesEncoded));
+				yarnClient, confDirPath, slots, taskManagerCount, dynamicPropertiesEncoded,
+				timeout));
 
 		actorSystem.awaitTermination();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index d80f133..58ce6cf 100644
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -35,7 +35,7 @@ import scala.concurrent.duration._
 
 class ApplicationClient(appId: ApplicationId, port: Int, yarnClient: YarnClient,
                         confDirPath: String, slots: Int, numTaskManagers: Int,
-                        dynamicPropertiesEncoded: String)
+                        dynamicPropertiesEncoded: String, timeout: FiniteDuration)
   extends Actor with Consumer with ActorLogMessages with ActorLogging {
   import context._
 
@@ -85,7 +85,7 @@ class ApplicationClient(appId: ApplicationId, port: Int, yarnClient: YarnClient,
 
           writeYarnProperties(address)
 
-          jobManager = Some(AkkaUtils.getReference(JobManager.getAkkaURL(address)))
+          jobManager = Some(AkkaUtils.getReference(JobManager.getAkkaURL(address))(system, timeout))
           jobManager.get ! RegisterMessageListener
 
           pollingTimer foreach {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 628db6d..64db0ad 100644
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -159,7 +159,7 @@ object ApplicationMaster{
     val args = Array[String]("--configDir", pathToConfig)
 
     LOG.info(s"Config path: ${pathToConfig}.")
-    val (hostname, port, configuration) = JobManager.parseArgs(args)
+    val (hostname, port, configuration, _) = JobManager.parseArgs(args)
 
     implicit val jobManagerSystem = YarnUtils.createActorSystem(hostname, port, configuration)
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index bc021c1..55fda89 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -60,6 +60,8 @@ public class LocalExecutor extends PlanExecutor {
 	private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 
 	private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;
+
+	private boolean printStatusDuringExecution = true;
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -82,6 +84,10 @@ public class LocalExecutor extends PlanExecutor {
 	public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
 
 	public int getTaskManagerNumSlots() { return this.taskManagerNumSlots; }
+
+	public void setPrintStatusDuringExecution(boolean printStatus) {
+		this.printStatusDuringExecution = printStatus;
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -164,7 +170,7 @@ public class LocalExecutor extends PlanExecutor {
 
 				ActorRef jobClient = flink.getJobClient();
 
-				return JobClient.submitJobAndWait(jobGraph, true, jobClient);
+				return JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, jobClient);
 			}
 			finally {
 				if (shutDownAtEnd) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index fe97899..c447edb 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -35,7 +35,7 @@ if [ ! -f "$HOSTLIST" ]; then
 fi
 
 # cluster mode, bring up job manager locally and a task manager on every slave host
-"$FLINK_BIN_DIR"/jobManager.sh start cluster
+"$FLINK_BIN_DIR"/jobmanager.sh start cluster
 
 GOON=true
 while $GOON

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-dist/src/main/flink-bin/bin/start-local.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.sh b/flink-dist/src/main/flink-bin/bin/start-local.sh
index 83f621a..f382763 100755
--- a/flink-dist/src/main/flink-bin/bin/start-local.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-local.sh
@@ -24,4 +24,4 @@ bin=`cd "$bin"; pwd`
 . "$bin"/config.sh
 
 # local mode, only bring up job manager. The job manager will start an internal task manager
-"$FLINK_BIN_DIR"/jobManager.sh start local
+"$FLINK_BIN_DIR"/jobmanager.sh start local

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
index 5fba480..d9fe6f6 100755
--- a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
@@ -46,4 +46,4 @@ do
 done < $HOSTLIST
 
 # cluster mode, stop the job manager locally and stop the task manager on every slave host
-"$FLINK_BIN_DIR"/jobManager.sh stop
+"$FLINK_BIN_DIR"/jobmanager.sh stop

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-dist/src/main/flink-bin/bin/stop-local.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/stop-local.sh b/flink-dist/src/main/flink-bin/bin/stop-local.sh
index c576723..79627fa 100755
--- a/flink-dist/src/main/flink-bin/bin/stop-local.sh
+++ b/flink-dist/src/main/flink-bin/bin/stop-local.sh
@@ -24,4 +24,4 @@ bin=`cd "$bin"; pwd`
 . "$bin"/config.sh
 
 # stop local job manager (has an internal task manager)
-"$FLINK_BIN_DIR"/jobManager.sh stop
+"$FLINK_BIN_DIR"/jobmanager.sh stop