You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:45:53 UTC
[57/82] [abbrv] incubator-flink git commit: Removed JobStatusListener
and ExecutionListener. Fixed LocalExecutor output for maven verify.
Removed JobStatusListener and ExecutionListener. Fixed LocalExecutor output for maven verify.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/8eadd3ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/8eadd3ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/8eadd3ec
Branch: refs/heads/master
Commit: 8eadd3ec2e07aa5eec6daccc6849b1038bbeae5f
Parents: c175ebe
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 17 17:06:13 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:31 2014 +0100
----------------------------------------------------------------------
.../flink/streaming/util/ClusterUtil.java | 1 -
.../main/java/org/apache/flink/yarn/Client.java | 9 +-
.../apache/flink/yarn/ApplicationClient.scala | 4 +-
.../apache/flink/yarn/ApplicationMaster.scala | 2 +-
.../org/apache/flink/client/LocalExecutor.java | 8 +-
.../src/main/flink-bin/bin/start-cluster.sh | 2 +-
.../src/main/flink-bin/bin/start-local.sh | 2 +-
.../src/main/flink-bin/bin/stop-cluster.sh | 2 +-
flink-dist/src/main/flink-bin/bin/stop-local.sh | 2 +-
.../web-docs-infoserver/js/jquery-2.1.0.js | 9111 ------------------
.../apache/flink/runtime/blob/BlobClient.java | 6 +-
.../apache/flink/runtime/blob/BlobServer.java | 18 +-
.../runtime/execution/ExecutionListener.java | 33 -
.../runtime/executiongraph/ExecutionGraph.java | 40 -
.../executiongraph/JobStatusListener.java | 36 -
.../runtime/profiling/TaskManagerProfiler.java | 13 +-
.../apache/flink/runtime/taskmanager/Task.java | 31 +-
.../web-docs-infoserver/js/jquery-1.10.2.js | 6 -
.../web-docs-infoserver/js/jquery-2.1.0.js | 9111 ++++++++++++++++++
.../flink/runtime/taskmanager/TaskManager.scala | 6 +-
.../clients/examples/LocalExecutorITCase.java | 1 +
...terationNotDependingOnSolutionSetITCase.java | 44 +-
22 files changed, 9183 insertions(+), 9305 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index a7b7137..f75db68 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.util;
import java.net.InetSocketAddress;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
index b7c8e51..1de61a8 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.jar.JarFile;
import akka.actor.ActorRef;
@@ -71,6 +72,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
+import scala.concurrent.duration.FiniteDuration;
/**
* All classes in this package contain code taken from
@@ -328,6 +330,9 @@ public class Client {
LOG.warn("Unable to find job manager port in configuration!");
jmPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
}
+ FiniteDuration timeout = new FiniteDuration(GlobalConfiguration.getInteger
+ (ConfigConstants.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT),
+ TimeUnit.SECONDS);
conf = Utils.initializeYarnConfiguration();
@@ -520,7 +525,6 @@ public class Client {
// file that we write into the conf/ dir containing the jobManager address and the dop.
yarnPropertiesFile = new File(confDirPath + CliFrontend.YARN_PROPERTIES_FILE);
-
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext);
@@ -533,7 +537,8 @@ public class Client {
// start application client
LOG.info("Start application client.");
applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, appId, jmPort,
- yarnClient, confDirPath, slots, taskManagerCount, dynamicPropertiesEncoded));
+ yarnClient, confDirPath, slots, taskManagerCount, dynamicPropertiesEncoded,
+ timeout));
actorSystem.awaitTermination();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index d80f133..58ce6cf 100644
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -35,7 +35,7 @@ import scala.concurrent.duration._
class ApplicationClient(appId: ApplicationId, port: Int, yarnClient: YarnClient,
confDirPath: String, slots: Int, numTaskManagers: Int,
- dynamicPropertiesEncoded: String)
+ dynamicPropertiesEncoded: String, timeout: FiniteDuration)
extends Actor with Consumer with ActorLogMessages with ActorLogging {
import context._
@@ -85,7 +85,7 @@ class ApplicationClient(appId: ApplicationId, port: Int, yarnClient: YarnClient,
writeYarnProperties(address)
- jobManager = Some(AkkaUtils.getReference(JobManager.getAkkaURL(address)))
+ jobManager = Some(AkkaUtils.getReference(JobManager.getAkkaURL(address))(system, timeout))
jobManager.get ! RegisterMessageListener
pollingTimer foreach {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 628db6d..64db0ad 100644
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -159,7 +159,7 @@ object ApplicationMaster{
val args = Array[String]("--configDir", pathToConfig)
LOG.info(s"Config path: ${pathToConfig}.")
- val (hostname, port, configuration) = JobManager.parseArgs(args)
+ val (hostname, port, configuration, _) = JobManager.parseArgs(args)
implicit val jobManagerSystem = YarnUtils.createActorSystem(hostname, port, configuration)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index bc021c1..55fda89 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -60,6 +60,8 @@ public class LocalExecutor extends PlanExecutor {
private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;
+
+ private boolean printStatusDuringExecution = true;
// --------------------------------------------------------------------------------------------
@@ -82,6 +84,10 @@ public class LocalExecutor extends PlanExecutor {
public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
public int getTaskManagerNumSlots() { return this.taskManagerNumSlots; }
+
+ public void setPrintStatusDuringExecution(boolean printStatus) {
+ this.printStatusDuringExecution = printStatus;
+ }
// --------------------------------------------------------------------------------------------
@@ -164,7 +170,7 @@ public class LocalExecutor extends PlanExecutor {
ActorRef jobClient = flink.getJobClient();
- return JobClient.submitJobAndWait(jobGraph, true, jobClient);
+ return JobClient.submitJobAndWait(jobGraph, printStatusDuringExecution, jobClient);
}
finally {
if (shutDownAtEnd) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index fe97899..c447edb 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -35,7 +35,7 @@ if [ ! -f "$HOSTLIST" ]; then
fi
# cluster mode, bring up job manager locally and a task manager on every slave host
-"$FLINK_BIN_DIR"/jobManager.sh start cluster
+"$FLINK_BIN_DIR"/jobmanager.sh start cluster
GOON=true
while $GOON
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-dist/src/main/flink-bin/bin/start-local.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.sh b/flink-dist/src/main/flink-bin/bin/start-local.sh
index 83f621a..f382763 100755
--- a/flink-dist/src/main/flink-bin/bin/start-local.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-local.sh
@@ -24,4 +24,4 @@ bin=`cd "$bin"; pwd`
. "$bin"/config.sh
# local mode, only bring up job manager. The job manager will start an internal task manager
-"$FLINK_BIN_DIR"/jobManager.sh start local
+"$FLINK_BIN_DIR"/jobmanager.sh start local
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
index 5fba480..d9fe6f6 100755
--- a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
@@ -46,4 +46,4 @@ do
done < $HOSTLIST
# cluster mode, stop the job manager locally and stop the task manager on every slave host
-"$FLINK_BIN_DIR"/jobManager.sh stop
+"$FLINK_BIN_DIR"/jobmanager.sh stop
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8eadd3ec/flink-dist/src/main/flink-bin/bin/stop-local.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/stop-local.sh b/flink-dist/src/main/flink-bin/bin/stop-local.sh
index c576723..79627fa 100755
--- a/flink-dist/src/main/flink-bin/bin/stop-local.sh
+++ b/flink-dist/src/main/flink-bin/bin/stop-local.sh
@@ -24,4 +24,4 @@ bin=`cd "$bin"; pwd`
. "$bin"/config.sh
# stop local job manager (has an internal task manager)
-"$FLINK_BIN_DIR"/jobManager.sh stop
+"$FLINK_BIN_DIR"/jobmanager.sh stop