You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/08/31 12:31:45 UTC
[09/10] flink git commit: [FLINK-2291] [runtime] Add ZooKeeper
support to elect a leader from a set of JobManager. The leader will then be
retrieved from ZooKeeper by the TaskManagers.
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 355da2d..dc242b2 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -452,6 +452,31 @@ public final class ConfigConstants {
public static final String FLINK_JVM_OPTIONS = "env.java.opts";
+ // --------------------------- Recovery -----------------------------------
+
+ /** Defines recovery mode used for the cluster execution ("standalone", "zookeeper") */
+ public static final String RECOVERY_MODE = "recovery.mode";
+
+ // --------------------------- ZooKeeper ----------------------------------
+
+ /** ZooKeeper servers. */
+ public static final String ZOOKEEPER_QUORUM_KEY = "ha.zookeeper.quorum";
+
+ /** ZooKeeper root path. */
+ public static final String ZOOKEEPER_DIR_KEY = "ha.zookeeper.dir";
+
+ public static final String ZOOKEEPER_LATCH_PATH = "ha.zookeeper.dir.latch";
+
+ public static final String ZOOKEEPER_LEADER_PATH = "ha.zookeeper.dir.leader";
+
+ public static final String ZOOKEEPER_SESSION_TIMEOUT = "ha.zookeeper.client.session-timeout";
+
+ public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "ha.zookeeper.client.connection-timeout";
+
+ public static final String ZOOKEEPER_RETRY_WAIT = "ha.zookeeper.client.retry-wait";
+
+ public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = "ha.zookeeper.client.max-retry-attempts";
+
// ------------------------------------------------------------------------
// Default Values
// ------------------------------------------------------------------------
@@ -694,33 +719,23 @@ public final class ConfigConstants {
/**
* Sets the number of local task managers
*/
- public static final String LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER = "localinstancemanager.numtaskmanager";
-
- public static final String LOCAL_INSTANCE_MANAGER_START_WEBSERVER = "localinstancemanager.start-webserver";
-
- // --------------------------- ZooKeeper ----------------------------------
-
- /** ZooKeeper servers. */
- public static final String ZOOKEEPER_QUORUM_KEY = "ha.zookeeper.quorum";
-
- /** ZooKeeper root path. */
- public static final String ZOOKEEPER_DIR_KEY = "ha.zookeeper.dir";
+ public static final String LOCAL_NUMBER_TASK_MANAGER = "local.number-taskmanager";
- public static final String ZOOKEEPER_LATCH_PATH = "ha.zookeeper.dir.latch";
+ public static final int DEFAULT_LOCAL_NUMBER_TASK_MANAGER = 1;
- public static final String ZOOKEEPER_LEADER_PATH = "ha.zookeeper.dir.leader";
+ public static final String LOCAL_NUMBER_JOB_MANAGER = "local.number-jobmanager";
- public static final String ZOOKEEPER_SESSION_TIMEOUT = "ha.zookeeper.client.session-timeout";
+ public static final int DEFAULT_LOCAL_NUMBER_JOB_MANAGER = 1;
- public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "ha.zookeeper.client.connection-timeout";
+ public static final String LOCAL_START_WEBSERVER = "local.start-webserver";
- public static final String ZOOKEEPER_RETRY_WAIT = "ha.zookeeper.client.retry-wait";
+ // --------------------------- Recovery ---------------------------------
- public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = "ha.zookeeper.client.max-retry-attempts";
+ public static String DEFAULT_RECOVERY_MODE = "standalone";
- // - Defaults -------------------------------------------------------------
+ // --------------------------- ZooKeeper ----------------------------------
- public static final String DEFAULT_ZOOKEEPER_ZNODE_ROOT = "/flink";
+ public static final String DEFAULT_ZOOKEEPER_DIR_KEY = "/flink";
public static final String DEFAULT_ZOOKEEPER_LATCH_PATH = "/leaderlatch";
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
index 3676e62..dac5144 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
@@ -22,10 +22,6 @@ package org.apache.flink.types.parser;
import org.apache.flink.types.parser.ByteParser;
import org.apache.flink.types.parser.FieldParser;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
public class ByteParserTest extends ParserTestBase<Byte> {
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java b/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java
index 8de3b94..8134a68 100644
--- a/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/AbstractIDTest.java
@@ -25,8 +25,6 @@ import static org.junit.Assert.fail;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
-import java.nio.ByteBuffer;
-
/**
* This class contains tests for the {@link org.apache.flink.util.AbstractID} class.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-dist/src/main/flink-bin/LICENSE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE
index e79ff71..9df6da5 100644
--- a/flink-dist/src/main/flink-bin/LICENSE
+++ b/flink-dist/src/main/flink-bin/LICENSE
@@ -278,6 +278,10 @@ under the Apache License (v 2.0):
- Jansi (org.fusesource.jansi:jansi:1.4 - https://github.com/fusesource/jansi)
- Apache Camel Core (org.apache.camel:camel-core:2.10.3 - http://camel.apache.org/camel-core.html)
- Apache Commons Math (org.apache.commons:commons-math3:3.5 - http://commons.apache.org/proper/commons-math/index.html)
+ - Apache ZooKeeper (org.apache.zookeeper:zookeeper:3.4.6 - https://zookeeper.apache.org/)
+ - Apache Curator (org.apache.curator:curator-recipes:2.8.0 - http://curator.apache.org/)
+ - Apache Curator (org.apache.curator:curator-framework:2.8.0 - http://curator.apache.org/)
+ - Apache Curator (org.apache.curator:curator-client:2.8.0 - http://curator.apache.org/)
-----------------------------------------------------------------------
@@ -356,6 +360,7 @@ BSD-style licenses:
- ASM (org.ow2.asm:asm:5.0.4 - http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom
- Grizzled SLF4J (org.clapper:grizzled-slf4j_2.10:1.0.2 - http://software.clapper.org/grizzled-slf4j/) - Copyright (c) 2010 Brian M. Clapper
- ParaNamer (com.thoughtworks.paranamer:paranamer:2.3 - https://github.com/paul-hammant/paranamer) - Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc
+ - JLine (jline:jline:0.9.94 - http://jline.sourceforge.net/) - Copyright (c) 2002-2006, Marc Prud'hommeaux <mw...@cornell.edu>
(Below is the 3-clause BSD license)
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-dist/src/main/flink-bin/NOTICE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/NOTICE b/flink-dist/src/main/flink-bin/NOTICE
index 7b0fe72..19af75f 100644
--- a/flink-dist/src/main/flink-bin/NOTICE
+++ b/flink-dist/src/main/flink-bin/NOTICE
@@ -52,6 +52,28 @@ available from http://www.digip.org/jansson/.
-----------------------------------------------------------------------
+ Apache ZooKeeper
+-----------------------------------------------------------------------
+
+Apache ZooKeeper
+Copyright 2009-2014 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+-----------------------------------------------------------------------
+ Apache Curator
+-----------------------------------------------------------------------
+
+Apache Curator
+Copyright 2013-2014 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+-----------------------------------------------------------------------
Apache Sling
-----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 1d31493..04afa02 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -94,7 +94,7 @@ KEY_ENV_LOG_MAX="env.log.max"
KEY_ENV_JAVA_HOME="env.java.home"
KEY_ENV_JAVA_OPTS="env.java.opts"
KEY_ENV_SSH_OPTS="env.ssh.opts"
-KEY_ZK_QUORUM="ha.zookeeper.quorum"
+KEY_RECOVERY_MODE="recovery.mode"
KEY_ZK_HEAP_MB="zookeeper.heap.mb"
########################################################################################################################
@@ -205,8 +205,8 @@ if [ -z "${ZK_HEAP}" ]; then
ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
fi
-if [ -z "${ZK_QUORUM}" ]; then
- ZK_QUORUM=$(readFromConfig ${KEY_ZK_QUORUM} "" "${YAML_CONF}")
+if [ -z "${RECOVERY_MODE}" ]; then
+ RECOVERY_MODE=$(readFromConfig ${KEY_RECOVERY_MODE} "standalone" "${YAML_CONF}")
fi
# Arguments for the JVM. Used for job and task manager JVMs.
@@ -268,13 +268,18 @@ readMasters() {
fi
MASTERS=()
+ WEBUIPORTS=()
GOON=true
while $GOON; do
read line || GOON=false
- HOST=$( extractHostName $line)
- if [ -n "$HOST" ]; then
+ HOSTWEBUIPORT=$( extractHostName $line)
+
+ if [ -n "$HOSTWEBUIPORT" ]; then
+ HOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:)
+ WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -f2 -d:)
MASTERS+=(${HOST})
+ WEBUIPORTS+=(${WEBUIPORT})
fi
done < "$MASTERS_FILE"
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-dist/src/main/flink-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index 450d36b..c89f53a 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -18,12 +18,13 @@
################################################################################
# Start/stop a Flink JobManager.
-USAGE="Usage: jobmanager.sh (start (local|cluster) [batch|streaming] [host])|stop|stop-all)"
+USAGE="Usage: jobmanager.sh (start (local|cluster) [batch|streaming] [host] [webui-port])|stop|stop-all)"
STARTSTOP=$1
EXECUTIONMODE=$2
STREAMINGMODE=$3
HOST=$4 # optional when starting multiple instances
+WEBUIPORT=$5 # optinal when starting multiple instances
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
@@ -65,6 +66,10 @@ if [[ $STARTSTOP == "start" ]]; then
if [ ! -z $HOST ]; then
args="${args} --host $HOST"
fi
+
+ if [ ! -z $WEBUIPORT ]; then
+ args="${args} --webui-port $WEBUIPORT"
+ fi
fi
${bin}/flink-daemon.sh $STARTSTOP jobmanager "${args}"
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index 429285e..1fdd885 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -32,21 +32,26 @@ bin=`cd "$bin"; pwd`
. "$bin"/config.sh
# Start the JobManager instance(s)
-if [[ -z $ZK_QUORUM ]]; then
- echo "Starting cluster (${STREAMING_MODE} mode)."
-
- # Start single JobManager on this machine
- "$bin"/jobmanager.sh start cluster ${STREAMING_MODE}
-else
+shopt -s nocasematch
+if [[ $RECOVERY_MODE == "zookeeper" ]]; then
# HA Mode
readMasters
echo "Starting HA cluster (${STREAMING_MODE} mode) with ${#MASTERS[@]} masters and ${#ZK_QUORUM[@]} peers in ZooKeeper quorum."
- for master in ${MASTERS[@]}; do
- ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l $FLINK_BIN_DIR/jobmanager.sh start cluster ${STREAMING_MODE} ${master} &"
+ for ((i=0;i<${#MASTERS[@]};++i)); do
+ master=${MASTERS[i]}
+ webuiport=${WEBUIPORTS[i]}
+ ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l $FLINK_BIN_DIR/jobmanager.sh start cluster ${STREAMING_MODE} ${master} ${webuiport} &"
done
+
+else
+ echo "Starting cluster (${STREAMING_MODE} mode)."
+
+ # Start single JobManager on this machine
+ "$bin"/jobmanager.sh start cluster ${STREAMING_MODE}
fi
+shopt -u nocasematch
# Start TaskManager instance(s)
readSlaves
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
index 17a5daf..d6dd572 100755
--- a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
@@ -30,13 +30,16 @@ for slave in ${SLAVES[@]}; do
done
# Stop JobManager instance(s)
-if [[ -z $ZK_QUORUM ]]; then
- "$bin"/jobmanager.sh stop
-else
- # HA Mode
+shopt -s nocasematch
+if [[ $RECOVERY_MODE == "zookeeper" ]]; then
+ # HA Mode
readMasters
for master in ${MASTERS[@]}; do
ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l $bin/jobmanager.sh stop &"
done
+
+else
+ "$bin"/jobmanager.sh stop
fi
+shopt -u nocasematch
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index a017f3a..4efb7ad 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -30,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration;
import java.util.WeakHashMap;
/**
- * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archiver.
+ * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archive.
* <p>
* The holder will cache the ExecutionGraph behind a weak reference, which will be cleared
* at some point once no one else is pointing to the ExecutionGraph.
@@ -38,27 +40,34 @@ import java.util.WeakHashMap;
* stay valid.
*/
public class ExecutionGraphHolder {
-
- private final ActorGateway source;
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class);
+
+ /** Retrieves the current leading JobManager and its corresponding archive */
+ private final JobManagerArchiveRetriever retriever;
private final FiniteDuration timeout;
private final WeakHashMap<JobID, ExecutionGraph> cache = new WeakHashMap<JobID, ExecutionGraph>();
-
-
- public ExecutionGraphHolder(ActorGateway source) {
- this(source, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
+
+ public ExecutionGraphHolder(JobManagerArchiveRetriever retriever) {
+ this(retriever, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
}
- public ExecutionGraphHolder(ActorGateway source, FiniteDuration timeout) {
- if (source == null || timeout == null) {
+ public ExecutionGraphHolder(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
+ if (retriever == null || timeout == null) {
throw new NullPointerException();
}
- this.source = source;
+ this.retriever = retriever;
this.timeout = timeout;
}
-
-
+
+ /**
+ * Retrieves the execution graph with {@link JobID} jid or null if it cannot be found.
+ *
+ * @param jid jobID of the execution graph to be retrieved
+ * @return the retrieved execution graph or null if it is not retrievable
+ */
public ExecutionGraph getExecutionGraph(JobID jid) {
ExecutionGraph cached = cache.get(jid);
if (cached != null) {
@@ -66,19 +75,25 @@ public class ExecutionGraphHolder {
}
try {
- Future<Object> future = source.ask(new JobManagerMessages.RequestJob(jid), timeout);
- Object result = Await.result(future, timeout);
- if (result instanceof JobManagerMessages.JobNotFound) {
+ ActorGateway jobManager = retriever.getJobManagerGateway();
+
+ if (jobManager != null) {
+
+ Future<Object> future = jobManager.ask(new JobManagerMessages.RequestJob(jid), timeout);
+ Object result = Await.result(future, timeout);
+ if (result instanceof JobManagerMessages.JobNotFound) {
+ return null;
+ } else if (result instanceof JobManagerMessages.JobFound) {
+ ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
+ cache.put(jid, eg);
+ return eg;
+ } else {
+ throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
+ }
+ } else {
+ LOG.warn("No connection to the leading JobManager.");
return null;
}
- else if (result instanceof JobManagerMessages.JobFound) {
- ExecutionGraph eg = ((JobManagerMessages.JobFound) result).executionGraph();
- cache.put(jid, eg);
- return eg;
- }
- else {
- throw new RuntimeException("Unknown response from JobManager / Archive: " + result);
- }
}
catch (Exception e) {
throw new RuntimeException("Error requesting execution graph", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerArchiveRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerArchiveRetriever.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerArchiveRetriever.java
new file mode 100644
index 0000000..91c9ad5
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/JobManagerArchiveRetriever.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+
+/**
+ * Retrieves and stores the actor gateway to the current leading JobManager and its archive. In
+ * case of an error, the {@link WebRuntimeMonitor} to which this instance is associated will be
+ * stopped.
+ */
+public class JobManagerArchiveRetriever implements LeaderRetrievalListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobManagerArchiveRetriever.class);
+
+ private final ActorSystem actorSystem;
+ private final FiniteDuration lookupTimeout;
+ private final FiniteDuration timeout;
+ private final WebMonitor webMonitor;
+
+ /** will be written and read concurrently */
+ private volatile ActorGateway jobManagerGateway;
+ private volatile ActorGateway archiveGateway;
+
+ public JobManagerArchiveRetriever(
+ WebMonitor webMonitor,
+ ActorSystem actorSystem,
+ FiniteDuration lookupTimeout,
+ FiniteDuration timeout) {
+ this.webMonitor = webMonitor;
+ this.actorSystem = actorSystem;
+ this.lookupTimeout = lookupTimeout;
+ this.timeout = timeout;
+ }
+
+ public ActorGateway getJobManagerGateway() {
+ return jobManagerGateway;
+ }
+
+ public ActorGateway getArchiveGateway() {
+ return archiveGateway;
+ }
+
+
+ @Override
+ public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+ if (leaderAddress != null && !leaderAddress.equals("")) {
+ try {
+ ActorRef jobManager = AkkaUtils.getActorRef(
+ leaderAddress,
+ actorSystem,
+ lookupTimeout);
+ jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID);
+
+ Future<Object> archiveFuture = jobManagerGateway.ask(
+ JobManagerMessages.getRequestArchive(),
+ timeout);
+
+ ActorRef archive = ((JobManagerMessages.ResponseArchive) Await.result(
+ archiveFuture,
+ timeout)
+ ).actor();
+
+ archiveGateway = new AkkaActorGateway(archive, leaderSessionID);
+ } catch (Exception e) {
+ handleError(e);
+ }
+ }
+ }
+
+ @Override
+ public void handleError(Exception exception) {
+ LOG.error("Received error from LeaderRetrievalService.", exception);
+
+ try{
+ // stop associated webMonitor
+ webMonitor.stop();
+ } catch (Exception e) {
+ LOG.error("Error while stopping the web server due to a LeaderRetrievalService error.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 006d18d..4c38cae 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.webmonitor;
+import akka.actor.ActorSystem;
+import com.google.common.base.Preconditions;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
@@ -33,7 +35,8 @@ import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
import org.apache.flink.runtime.webmonitor.handlers.ExecutionPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
@@ -67,7 +70,7 @@ public class WebRuntimeMonitor implements WebMonitor {
public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
public static final long DEFAULT_REFRESH_INTERVAL = 5000;
-
+
/** Logger for web frontend startup / shutdown messages */
private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
@@ -77,22 +80,36 @@ public class WebRuntimeMonitor implements WebMonitor {
// ------------------------------------------------------------------------
private final Object startupShutdownLock = new Object();
-
- private final Router router;
+
+ private final LeaderRetrievalService leaderRetrievalService;
+
+ /** LeaderRetrievalListener which stores the currently leading JobManager and its archive */
+ private final JobManagerArchiveRetriever retriever;
private final int configuredPort;
+ private final Router router;
+
private ServerBootstrap bootstrap;
private Channel serverChannel;
-
- public WebRuntimeMonitor(Configuration config, ActorGateway jobManager, ActorGateway archive) throws IOException {
+ // ------------------------------------------------------------------------
+
+ public WebRuntimeMonitor(
+ Configuration config,
+ LeaderRetrievalService leaderRetrievalService,
+ ActorSystem actorSystem)
+ throws IOException {
+ Preconditions.checkNotNull(config);
+ this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
+
// figure out where our static contents is
final String configuredWebRoot = config.getString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY, null);
final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
-
+
final File webRootDir;
+
if (configuredWebRoot != null) {
webRootDir = new File(configuredWebRoot);
}
@@ -113,22 +130,27 @@ public class WebRuntimeMonitor implements WebMonitor {
// port configuration
this.configuredPort = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
if (this.configuredPort < 0) {
throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort);
}
-
- ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(jobManager);
-
+
+ FiniteDuration timeout = AkkaUtils.getTimeout(config);
+ FiniteDuration lookupTimeout = AkkaUtils.getTimeout(config);
+
+ retriever = new JobManagerArchiveRetriever(this, actorSystem, lookupTimeout, timeout);
+
+ ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(retriever);
+
router = new Router()
// config how to interact with this web server
.GET("/config", handler(new RequestConfigHandler(DEFAULT_REFRESH_INTERVAL)))
-
+
// the overview - how many task managers, slots, free slots, ...
- .GET("/overview", handler(new RequestOverviewHandler(jobManager)))
+ .GET("/overview", handler(new RequestOverviewHandler(retriever)))
// currently running jobs
- .GET("/jobs", handler(new RequestJobIdsHandler(jobManager)))
+ .GET("/jobs", handler(new RequestJobIdsHandler(retriever)))
.GET("/jobs/:jobid", handler(new JobSummaryHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices", handler(new JobVerticesOverviewHandler(currentGraphs)))
.GET("/jobs/:jobid/plan", handler(new ExecutionPlanHandler(currentGraphs)))
@@ -137,12 +159,10 @@ public class WebRuntimeMonitor implements WebMonitor {
// .GET("/running/:jobid/:jobvertex", handler(new ExecutionPlanHandler(currentGraphs)))
// the handler for the legacy requests
- .GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
-
+ .GET("/jobsInfo", new JobManagerInfoHandler(retriever, DEFAULT_REQUEST_TIMEOUT))
+
// this handler serves all the static contents
.GET("/:*", new StaticFileServerHandler(webRootDir));
-
-
}
@Override
@@ -151,13 +171,13 @@ public class WebRuntimeMonitor implements WebMonitor {
if (this.bootstrap != null) {
throw new IllegalStateException("The server has already been started");
}
-
+
+ final Handler handler = new Handler(router);
+
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
-
+
@Override
protected void initChannel(SocketChannel ch) {
- Handler handler = new Handler(router);
-
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
@@ -168,7 +188,7 @@ public class WebRuntimeMonitor implements WebMonitor {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
-
+
this.bootstrap = new ServerBootstrap();
this.bootstrap
.group(bossGroup, workerGroup)
@@ -177,18 +197,22 @@ public class WebRuntimeMonitor implements WebMonitor {
Channel ch = this.bootstrap.bind(configuredPort).sync().channel();
this.serverChannel = ch;
-
+
InetSocketAddress bindAddress = (InetSocketAddress) ch.localAddress();
String address = bindAddress.getAddress().getHostAddress();
int port = bindAddress.getPort();
LOG.info("Web frontend listening at " + address + ':' + port);
+
+ leaderRetrievalService.start(retriever);
}
}
@Override
public void stop() throws Exception {
synchronized (startupShutdownLock) {
+ leaderRetrievalService.stop();
+
if (this.serverChannel != null) {
this.serverChannel.close().awaitUninterruptibly();
this.serverChannel = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
index aa1a39f..8a177f4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
+import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
import org.apache.flink.runtime.webmonitor.JsonFactory;
import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
@@ -37,19 +38,19 @@ import java.util.Map;
*/
public class RequestJobIdsHandler implements RequestHandler, RequestHandler.JsonResponse {
- private final ActorGateway target;
+ private final JobManagerArchiveRetriever retriever;
private final FiniteDuration timeout;
- public RequestJobIdsHandler(ActorGateway target) {
- this(target, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
+ public RequestJobIdsHandler(JobManagerArchiveRetriever retriever) {
+ this(retriever, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
}
- public RequestJobIdsHandler(ActorGateway target, FiniteDuration timeout) {
- if (target == null || timeout == null) {
+ public RequestJobIdsHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
+ if (retriever == null || timeout == null) {
throw new NullPointerException();
}
- this.target = target;
+ this.retriever = retriever;
this.timeout = timeout;
}
@@ -57,9 +58,15 @@ public class RequestJobIdsHandler implements RequestHandler, RequestHandler.Json
public String handleRequest(Map<String, String> params) throws Exception {
// we need no parameters, get all requests
try {
- Future<Object> future = target.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
- JobsWithIDsOverview result = (JobsWithIDsOverview) Await.result(future, timeout);
- return JsonFactory.generateJobsOverviewJSON(result);
+ ActorGateway jobManager = retriever.getJobManagerGateway();
+
+ if (jobManager != null) {
+ Future<Object> future = jobManager.ask(RequestJobsWithIDsOverview.getInstance(), timeout);
+ JobsWithIDsOverview result = (JobsWithIDsOverview) Await.result(future, timeout);
+ return JsonFactory.generateJobsOverviewJSON(result);
+ } else {
+ throw new Exception("No connection to the leading JobManager.");
+ }
}
catch (Exception e) {
throw new RuntimeException("Failed to fetch list of all running jobs: " + e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
index c2c00c7..ce30122 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestOverviewHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.webmonitor.RequestStatusWithJobIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.StatusWithJobIDsOverview;
+import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
import org.apache.flink.runtime.webmonitor.JsonFactory;
import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
@@ -36,29 +37,35 @@ import java.util.Map;
*/
public class RequestOverviewHandler implements RequestHandler, RequestHandler.JsonResponse {
- private final ActorGateway jobManager;
+ private final JobManagerArchiveRetriever retriever;
private final FiniteDuration timeout;
- public RequestOverviewHandler(ActorGateway jobManager) {
- this(jobManager, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
+ public RequestOverviewHandler(JobManagerArchiveRetriever retriever) {
+ this(retriever, WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
}
- public RequestOverviewHandler(ActorGateway jobManager, FiniteDuration timeout) {
- if (jobManager == null || timeout == null) {
+ public RequestOverviewHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
+ if (retriever == null || timeout == null) {
throw new NullPointerException();
}
- this.jobManager = jobManager;
+ this.retriever = retriever;
this.timeout = timeout;
}
@Override
public String handleRequest(Map<String, String> params) throws Exception {
try {
- Future<Object> future = jobManager.ask(RequestStatusWithJobIDsOverview.getInstance(), timeout);
- StatusWithJobIDsOverview result = (StatusWithJobIDsOverview) Await.result(future, timeout);
- return JsonFactory.generateOverviewWithJobIDsJSON(result);
+ ActorGateway jobManager = retriever.getJobManagerGateway();
+
+ if (jobManager != null) {
+ Future<Object> future = jobManager.ask(RequestStatusWithJobIDsOverview.getInstance(), timeout);
+ StatusWithJobIDsOverview result = (StatusWithJobIDsOverview) Await.result(future, timeout);
+ return JsonFactory.generateOverviewWithJobIDsJSON(result);
+ } else {
+ throw new Exception("No connection to the leading job manager.");
+ }
}
catch (Exception e) {
throw new Exception("Failed to fetch the status overview: " + e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
index 9b52736..3f1842b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneou
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsNotFound;
import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResultsStringified;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.webmonitor.JobManagerArchiveRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.StringUtils;
@@ -75,14 +76,15 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
private static final Charset ENCODING = Charset.forName("UTF-8");
/** Underlying JobManager */
- private final ActorGateway jobmanager;
- private final ActorGateway archive;
+ private final JobManagerArchiveRetriever retriever;
private final FiniteDuration timeout;
+ private ActorGateway jobmanager;
+ private ActorGateway archive;
- public JobManagerInfoHandler(ActorGateway jobmanager, ActorGateway archive, FiniteDuration timeout) {
- this.jobmanager = jobmanager;
- this.archive = archive;
+
+ public JobManagerInfoHandler(JobManagerArchiveRetriever retriever, FiniteDuration timeout) {
+ this.retriever = retriever;
this.timeout = timeout;
}
@@ -90,6 +92,18 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
DefaultFullHttpResponse response;
try {
+ jobmanager = retriever.getJobManagerGateway();
+
+ if (jobmanager == null) {
+ throw new Exception("No connection to leading JobManager.");
+ }
+
+ archive = retriever.getArchiveGateway();
+
+ if (archive == null) {
+ throw new Exception("No connection to leading JobManager.");
+ }
+
String result = handleRequest(routed);
byte[] bytes = result.getBytes(ENCODING);
@@ -114,6 +128,8 @@ public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
@SuppressWarnings("unchecked")
private String handleRequest(Routed routed) throws Exception {
+
+
if ("archive".equals(routed.queryParam("get"))) {
Future<Object> response = archive.ask(ArchiveMessages.getRequestArchivedJobs(), timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
index 9a9b6ba..a2978a1 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
@@ -45,14 +45,15 @@ public class TestRunner {
// start the cluster with the runtime monitor
Configuration configuration = new Configuration();
- configuration.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, true);
+ configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
configuration.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true);
configuration.setString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY,
- "/data/repositories/flink/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/resources/web-runtime-monitor");
+ "flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/resources/web-runtime-monitor");
LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(configuration, false);
+ cluster.start();
- final int port = cluster.getJobManagerRPCPort();
+ final int port = cluster.getLeaderRPCPort();
runWordCount(port);
runWebLogAnalysisExample(port);
runWordCount(port);
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/resources/log4j-test.properties b/flink-runtime-web/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2b9292a
--- /dev/null
+++ b/flink-runtime-web/src/test/resources/log4j-test.properties
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger=OFF, console
+
+# -----------------------------------------------------------------------------
+# Console (use 'console')
+# -----------------------------------------------------------------------------
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.flink.util.MavenForkNumberPrefixLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# -----------------------------------------------------------------------------
+# File (use 'file')
+# -----------------------------------------------------------------------------
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.file=${log.dir}/{$mvn.forkNumber}.log
+log4j.appender.file.append=false
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime-web/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/resources/logback-test.xml b/flink-runtime-web/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..9d4f644
--- /dev/null
+++ b/flink-runtime-web/src/test/resources/logback-test.xml
@@ -0,0 +1,42 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] [%X{sourceThread} - %X{akkaSource}] %-5level %logger{60} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+ <!-- The following loggers are disabled during tests, because many tests deliberately
+ throw error to test failing scenarios. Logging those would overflow the log. -->
+ <!---->
+ <logger name="org.apache.flink.runtime.operators.DataSinkTask" level="OFF"/>
+ <logger name="org.apache.flink.runtime.operators.RegularPactTask" level="OFF"/>
+ <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
+ <logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>
+ <logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>
+ <logger name="org.apache.flink.runtime.testingUtils" level="OFF"/>
+ <logger name="org.apache.flink.runtime.executiongraph.ExecutionGraph" level="OFF"/>
+ <logger name="org.apache.flink.runtime.jobmanager.EventCollector" level="OFF"/>
+ <logger name="org.apache.flink.runtime.instance.InstanceManager" level="OFF"/>
+</configuration>
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 6dc8d42..c5706a1 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -196,6 +196,12 @@ under the License.
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
index 0623862..7128286 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import java.util.UUID;
@@ -82,28 +81,27 @@ public abstract class FlinkUntypedActor extends UntypedActor {
private void handleLeaderSessionID(Object message) throws Exception {
if(message instanceof LeaderSessionMessage) {
LeaderSessionMessage msg = (LeaderSessionMessage) message;
+ UUID expectedID = getLeaderSessionID();
+ UUID actualID = msg.leaderSessionID();
- if(msg.leaderSessionID().isDefined() && getLeaderSessionID().isDefined()) {
- if(getLeaderSessionID().equals(msg.leaderSessionID())) {
- // finally call method to handle message
- handleMessage(msg.message());
- } else {
- handleDiscardedMessage(msg);
- }
+ if(expectedID == actualID || (expectedID != null && expectedID.equals(actualID))) {
+ handleMessage(msg.message());
} else {
- handleDiscardedMessage(msg);
+ handleDiscardedMessage(expectedID, msg);
}
} else if (message instanceof RequiresLeaderSessionID) {
throw new Exception("Received a message " + message + " without a leader session " +
- "ID, even though it requires to have one.");
+ "ID, even though the message requires a leader session ID.");
} else {
// call method to handle message
handleMessage(message);
}
}
- private void handleDiscardedMessage(Object msg) {
- LOG.debug("Discard message {} because the leader session ID was not correct.", msg);
+ private void handleDiscardedMessage(UUID expectedLeaderSessionID, LeaderSessionMessage msg) {
+ LOG.warn("Discard message {} because the expected leader session ID {} did not " +
+ "equal the received leader session ID {}.", msg, expectedLeaderSessionID,
+ msg.leaderSessionID());
}
/**
@@ -118,7 +116,7 @@ public abstract class FlinkUntypedActor extends UntypedActor {
* Returns the current leader session ID associcated with this actor.
* @return
*/
- protected abstract Option<UUID> getLeaderSessionID();
+ abstract protected UUID getLeaderSessionID();
/**
* This method should be called for every outgoing message. It wraps messages which require
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ListeningBehaviour.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ListeningBehaviour.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ListeningBehaviour.java
new file mode 100644
index 0000000..59e3f16
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ListeningBehaviour.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+/**
+ * Defines the listening behaviour of the JobClientActor and thus the messages
+ * which are sent from the JobManger to the JobClientActor.
+ */
+public enum ListeningBehaviour {
+ DETACHED, // only receive the Acknowledge message about the job submission message
+ EXECUTION_RESULT, // receive additionally the SerializedJobExecutionResult
+ EXECUTION_RESULT_AND_STATE_CHANGES // receive additionally the JobStatusChanged messages
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 626d21f..ef2ef61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -258,6 +258,10 @@ public class BlobServer extends Thread implements BlobService {
LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.");
}
}
+
+ if(LOG.isInfoEnabled()) {
+ LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(), getPort());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3681e9e..8f0b19b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -36,7 +36,6 @@ import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -576,7 +575,7 @@ public class CheckpointCoordinator {
public ActorGateway createJobStatusListener(
ActorSystem actorSystem,
long checkpointInterval,
- Option<UUID> leaderSessionID) {
+ UUID leaderSessionID) {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
index f65be15..7e32b72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import scala.Option;
import java.util.UUID;
@@ -34,16 +33,17 @@ public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {
private final CheckpointCoordinator coordinator;
private final long interval;
- private final Option<UUID> leaderSessionID;
+ private final UUID leaderSessionID;
public CheckpointCoordinatorDeActivator(
CheckpointCoordinator coordinator,
long interval,
- Option<UUID> leaderSessionID) {
- Preconditions.checkNotNull(coordinator, "The checkpointCoordinator must not be null.");
- Preconditions.checkNotNull(leaderSessionID, "The leaderSesssionID must not be null.");
+ UUID leaderSessionID) {
+
+ LOG.info("Create CheckpointCoordinatorDeActivator");
+
+ this.coordinator = Preconditions.checkNotNull(coordinator, "The checkpointCoordinator must not be null.");
- this.coordinator = coordinator;
this.interval = interval;
this.leaderSessionID = leaderSessionID;
}
@@ -67,7 +67,7 @@ public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {
}
@Override
- public Option<UUID> getLeaderSessionID() {
+ public UUID getLeaderSessionID() {
return leaderSessionID;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 7507643..9d64866 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.JobClientMessages;
@@ -120,6 +121,8 @@ public class JobClient {
* @param jobManagerGateway Gateway to the JobManager that should execute the job.
* @param jobGraph JobGraph describing the Flink job
* @param timeout Timeout for futures
+ * @param sysoutLogUpdates prints log updates to system out if true
+ * @param userCodeClassloader class loader to be used for deserialization
* @return The job execution result
* @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job
* execution fails.
@@ -236,7 +239,10 @@ public class JobClient {
Object result;
try {
Future<Object> future = jobManagerGateway.ask(
- new JobManagerMessages.SubmitJob(jobGraph, false),
+ new JobManagerMessages.SubmitJob(
+ jobGraph,
+ ListeningBehaviour.DETACHED // only receive the Acknowledge for the job submission message
+ ),
timeout);
result = Await.result(future, timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index 16c6baf..bf747c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -24,12 +24,12 @@ import akka.actor.Status;
import akka.actor.Terminated;
import com.google.common.base.Preconditions;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.slf4j.Logger;
-import scala.Option;
import java.util.UUID;
@@ -44,18 +44,18 @@ public class JobClientActor extends FlinkUntypedActor {
private final boolean sysoutUpdates;
/** leader session ID of the JobManager when this actor was created */
- private final Option<UUID> leaderSessionID;
+ private final UUID leaderSessionID;
/** Actor which submits a job to the JobManager via this actor */
private ActorRef submitter;
public JobClientActor(ActorRef jobManager, Logger logger, boolean sysoutUpdates,
- Option<UUID> leaderSessionID) {
-
+ UUID leaderSessionID) {
+
this.jobManager = Preconditions.checkNotNull(jobManager, "The JobManager ActorRef must not be null.");
this.logger = Preconditions.checkNotNull(logger, "The logger must not be null.");
- this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID, "The leader session ID option must not be null.");
+ this.leaderSessionID = leaderSessionID;
this.sysoutUpdates = sysoutUpdates;
}
@@ -91,7 +91,11 @@ public class JobClientActor extends FlinkUntypedActor {
this.submitter = getSender();
jobManager.tell(
- decorateMessage(new JobManagerMessages.SubmitJob(jobGraph, true)), getSelf());
+ decorateMessage(
+ new JobManagerMessages.SubmitJob(
+ jobGraph,
+ ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
+ getSelf());
// make sure we notify the sender when the connection got lost
getContext().watch(jobManager);
@@ -102,8 +106,7 @@ public class JobClientActor extends FlinkUntypedActor {
String msg = "Received repeated 'SubmitJobAndWait'";
logger.error(msg);
getSender().tell(
- decorateMessage(new Status.Failure(new Exception(msg))),
- ActorRef.noSender());
+ decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender());
getContext().unwatch(jobManager);
getSelf().tell(decorateMessage(PoisonPill.getInstance()), ActorRef.noSender());
@@ -153,7 +156,7 @@ public class JobClientActor extends FlinkUntypedActor {
}
@Override
- protected Option<UUID> getLeaderSessionID() {
+ protected UUID getLeaderSessionID() {
return leaderSessionID;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 189682b..4cee2f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -556,7 +556,9 @@ public class Execution implements Serializable {
partitionId, partitionLocation);
final UpdatePartitionInfo updateTaskMessage = new UpdateTaskSinglePartitionInfo(
- consumer.getAttemptId(), partition.getIntermediateResult().getId(), descriptor);
+ consumer.getAttemptId(),
+ partition.getIntermediateResult().getId(),
+ descriptor);
sendUpdatePartitionInfoRpcCall(consumerSlot, updateTaskMessage);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3602372..cde1741 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -51,7 +51,6 @@ import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
@@ -326,7 +325,7 @@ public class ExecutionGraph implements Serializable {
List<ExecutionJobVertex> verticesToWaitFor,
List<ExecutionJobVertex> verticesToCommitTo,
ActorSystem actorSystem,
- Option<UUID> leaderSessionID) {
+ UUID leaderSessionID) {
// simple sanity checks
if (interval < 10 || checkpointTimeout < 10) {
throw new IllegalArgumentException();
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java
index fe4a1cd..a82debb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java
@@ -19,11 +19,11 @@
package org.apache.flink.runtime.instance;
import akka.actor.ActorRef;
-import scala.Option;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
+import java.io.Serializable;
import java.util.UUID;
/**
@@ -31,7 +31,7 @@ import java.util.UUID;
*
* It allows to avoid direct interaction with an ActorRef.
*/
-public interface ActorGateway {
+public interface ActorGateway extends Serializable {
/**
* Sends a message asynchronously and returns its response. The response to the message is
@@ -99,9 +99,9 @@ public interface ActorGateway {
ActorRef actor();
/**
- * Returns the leaderSessionID associated with the remote actor or None.
+ * Returns the leaderSessionID associated with the remote actor or null.
*
- * @return Leader session ID if its associated with this gateway, otherwise None
+ * @return Leader session ID if its associated with this gateway, otherwise null
*/
- Option<UUID> leaderSessionID();
+ UUID leaderSessionID();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
index ea55458..c00e7fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java
@@ -21,31 +21,33 @@ package org.apache.flink.runtime.instance;
import akka.actor.ActorRef;
import akka.pattern.Patterns;
import akka.util.Timeout;
-import org.apache.flink.runtime.LeaderSessionMessageDecorator;
-import org.apache.flink.runtime.MessageDecorator;
import org.apache.flink.runtime.akka.AkkaUtils;
-import scala.Option;
+import org.apache.flink.runtime.messages.LeaderSessionMessageDecorator;
+import org.apache.flink.runtime.messages.MessageDecorator;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
+import java.io.Serializable;
import java.util.UUID;
/**
* Concrete {@link ActorGateway} implementation which uses Akka to communicate with remote actors.
*/
-public class AkkaActorGateway implements ActorGateway {
+public class AkkaActorGateway implements ActorGateway, Serializable {
+
+ private static final long serialVersionUID = 42l;
// ActorRef of the remote instance
private final ActorRef actor;
// Associated leader session ID, which is used for RequiresLeaderSessionID messages
- private final Option<UUID> leaderSessionID;
+ private final UUID leaderSessionID;
// Decorator for messages
private final MessageDecorator decorator;
- public AkkaActorGateway(ActorRef actor, Option<UUID> leaderSessionID) {
+ public AkkaActorGateway(ActorRef actor, UUID leaderSessionID) {
this.actor = actor;
this.leaderSessionID = leaderSessionID;
// we want to wrap RequiresLeaderSessionID messages in a LeaderSessionMessage
@@ -151,7 +153,7 @@ public class AkkaActorGateway implements ActorGateway {
}
@Override
- public Option<UUID> leaderSessionID() {
+ public UUID leaderSessionID() {
return leaderSessionID;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 0a6b4d0..03213e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -30,7 +30,6 @@ import java.util.UUID;
import akka.actor.ActorRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
/**
* Simple manager that keeps track of which TaskManager are available and alive.
@@ -143,7 +142,7 @@ public class InstanceManager {
InstanceConnectionInfo connectionInfo,
HardwareDescription resources,
int numberOfSlots,
- Option<UUID> leaderSessionID){
+ UUID leaderSessionID){
synchronized(this.lock){
if (this.isShutdown) {
throw new IllegalStateException("InstanceManager is shut down.");
@@ -177,8 +176,14 @@ public class InstanceManager {
totalNumberOfAliveTaskSlots += numberOfSlots;
if (LOG.isInfoEnabled()) {
- LOG.info(String.format("Registered TaskManager at %s (%s) as %s. Current number of registered hosts is %d.",
- connectionInfo.getHostname(), taskManager.path(), id, registeredHostsById.size()));
+ LOG.info(String.format("Registered TaskManager at %s (%s) as %s. " +
+ "Current number of registered hosts is %d. " +
+ "Current number of alive task slots is %d.",
+ connectionInfo.getHostname(),
+ taskManager.path(),
+ id,
+ registeredHostsById.size(),
+ totalNumberOfAliveTaskSlots));
}
host.reportHeartBeat();
@@ -190,13 +195,22 @@ public class InstanceManager {
}
}
- public void unregisterTaskManager(ActorRef taskManager){
+ /**
+ * Unregisters the TaskManager with the given {@link ActorRef}. Unregistering means to mark
+ * the given instance as dead and notify {@link InstanceListener} about the dead instance.
+ *
+ * @param taskManager TaskManager which is about to be marked dead.
+ */
+ public void unregisterTaskManager(ActorRef taskManager, boolean terminated){
Instance host = registeredHostsByConnection.get(taskManager);
if(host != null){
registeredHostsByConnection.remove(taskManager);
registeredHostsById.remove(host.getId());
- deadHosts.add(taskManager);
+
+ if (terminated) {
+ deadHosts.add(taskManager);
+ }
host.markDead();
@@ -204,12 +218,30 @@ public class InstanceManager {
notifyDeadInstance(host);
- LOG.info("Unregistered task manager " + taskManager.path().address() + ". Number of " +
+ LOG.info("Unregistered task manager " + taskManager.path() + ". Number of " +
"registered task managers " + getNumberOfRegisteredTaskManagers() + ". Number" +
" of available slots " + getTotalNumberOfSlots() + ".");
}
}
+ /**
+ * Unregisters all currently registered TaskManagers from the InstanceManager.
+ */
+ public void unregisterAllTaskManagers() {
+ for(Instance instance: registeredHostsById.values()) {
+ deadHosts.add(instance.getActorGateway().actor());
+
+ instance.markDead();
+
+ totalNumberOfAliveTaskSlots -= instance.getTotalNumberOfSlots();
+
+ notifyDeadInstance(instance);
+ }
+
+ registeredHostsById.clear();
+ registeredHostsByConnection.clear();
+ }
+
public boolean isRegistered(ActorRef taskManager) {
return registeredHostsByConnection.containsKey(taskManager);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 539dbc0..9eb4a9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import org.apache.flink.runtime.messages.TaskMessages.FailTask;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -51,7 +52,6 @@ import java.io.IOException;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
-import static org.apache.flink.runtime.messages.TaskMessages.FailTask;
/**
* Network I/O components of each {@link TaskManager} instance. The network environment contains
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
index edfa87b..1202499 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
@@ -35,6 +35,8 @@ public class JobManagerCliOptions {
private String host;
+ private int webUIPort = -1;
+
// ------------------------------------------------------------------------
public String getConfigDir() {
@@ -86,4 +88,12 @@ public class JobManagerCliOptions {
public void setHost(String host) {
this.host = checkNotNull(host);
}
+
+ public int getWebUIPort() {
+ return webUIPort;
+ }
+
+ public void setWebUIPort(int webUIPort) {
+ this.webUIPort = webUIPort;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
new file mode 100644
index 0000000..90a1147
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+/**
+ * Recovery mode for Flink's cluster execution. Currently supported modes are:
+ *
+ * - Standalone: No recovery from JobManager failures
+ * - ZooKeeper: JobManager high availability via ZooKeeper
+ * ZooKeeper is used to select a leader among a group of JobManager. This JobManager
+ * is responsible for the job execution. Upon failure of the leader a new leader is elected
+ * which will take over the responsibilities of the old leader
+ */
+public enum RecoveryMode {
+ STANDALONE,
+ ZOOKEEPER
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 4383b65..21a1f51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -22,30 +22,41 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
+import java.util.UUID;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import com.google.common.base.Preconditions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
/**
* This class sets up a web-server that contains a web frontend to display information about running jobs.
* It instantiates and configures an embedded jetty server.
*/
-public class WebInfoServer implements WebMonitor {
+public class WebInfoServer implements WebMonitor, LeaderRetrievalListener {
/** Web root dir in the jar */
private static final String WEB_ROOT_DIR = "web-docs-infoserver";
@@ -56,6 +67,30 @@ public class WebInfoServer implements WebMonitor {
/** The jetty server serving all requests. */
private final Server server;
+ /** Retrieval service for the current leading JobManager */
+ private final LeaderRetrievalService leaderRetrievalService;
+
+ /** ActorSystem used to retrieve the ActorRefs */
+ private final ActorSystem actorSystem;
+
+ /** Collection for the registered jetty handlers */
+ private final HandlerCollection handlers;
+
+ /** Associated configuration */
+ private final Configuration config;
+
+ /** Timeout for the servlets */
+ private final FiniteDuration timeout;
+
+ /** Actor look up timeout */
+ private final FiniteDuration lookupTimeout;
+
+ /** Default jetty handler responsible for serving static content */
+ private final ResourceHandler resourceHandler;
+
+ /** File paths to log dirs */
+ final File[] logDirFiles;
+
/** The assigned port where jetty is running. */
private int assignedPort = -1;
@@ -64,19 +99,23 @@ public class WebInfoServer implements WebMonitor {
* to list all present information concerning the job manager
*
* @param config The Flink configuration.
- * @param jobmanager The ActorRef to the JobManager actor
- * @param archive The ActorRef to the archive for old jobs
+ * @param leaderRetrievalService Retrieval service to obtain the current leader
*
* @throws IOException
* Thrown, if the server setup failed for an I/O related reason.
*/
- public WebInfoServer(Configuration config, ActorGateway jobmanager, ActorGateway archive) throws IOException {
+ public WebInfoServer(
+ Configuration config,
+ LeaderRetrievalService leaderRetrievalService,
+ ActorSystem actorSystem)
+ throws IOException {
if (config == null) {
throw new IllegalArgumentException("No Configuration has been passed to the web server");
}
- if (jobmanager == null || archive == null) {
- throw new NullPointerException();
- }
+
+ this.config = config;
+
+ this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
// if port == 0, jetty will assign an available port.
int port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
@@ -85,7 +124,10 @@ public class WebInfoServer implements WebMonitor {
throw new IllegalArgumentException("Invalid port for the webserver: " + port);
}
- final FiniteDuration timeout = AkkaUtils.getTimeout(config);
+ timeout = AkkaUtils.getTimeout(config);
+ lookupTimeout = AkkaUtils.getLookupTimeout(config);
+
+ this.actorSystem = actorSystem;
// get base path of Flink installation
final String basePath = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, "");
@@ -99,7 +141,7 @@ public class WebInfoServer implements WebMonitor {
"resource " + WEB_ROOT_DIR + " is not included in the jar.");
}
- final File[] logDirFiles = new File[logDirPaths.length];
+ logDirFiles = new File[logDirPaths.length];
int i = 0;
for(String path : logDirPaths) {
logDirFiles[i++] = new File(path);
@@ -113,24 +155,16 @@ public class WebInfoServer implements WebMonitor {
server = new Server(port);
- // ----- the handlers for the servlets -----
- ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
- servletContext.setContextPath("/");
- servletContext.addServlet(new ServletHolder(new JobManagerInfoServlet(jobmanager, archive, timeout)), "/jobsInfo");
- servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo");
- servletContext.addServlet(new ServletHolder(new SetupInfoServlet(config, jobmanager, timeout)), "/setupInfo");
- servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu");
-
-
// ----- the handler serving all the static files -----
- ResourceHandler resourceHandler = new ResourceHandler();
+ resourceHandler = new ResourceHandler();
resourceHandler.setDirectoriesListed(false);
resourceHandler.setResourceBase(webRootDir.toExternalForm());
// ----- add the handlers to the list handler -----
- HandlerList handlers = new HandlerList();
+
+ // make the HandlerCollection mutable so that we can update it later on
+ handlers = new HandlerCollection(true);
handlers.addHandler(resourceHandler);
- handlers.addHandler(servletContext);
server.setHandler(handlers);
}
@@ -158,12 +192,15 @@ public class WebInfoServer implements WebMonitor {
else {
LOG.warn("Unable to determine local endpoint of web frontend server");
}
+
+ leaderRetrievalService.start(this);
}
/**
* Stop the webserver
*/
public void stop() throws Exception {
+ leaderRetrievalService.stop();
server.stop();
assignedPort = -1;
}
@@ -171,4 +208,86 @@ public class WebInfoServer implements WebMonitor {
public int getServerPort() {
return this.assignedPort;
}
+
+ @Override
+ public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+
+ if(leaderAddress != null && !leaderAddress.equals("")) {
+ try {
+ ActorRef jobManager = AkkaUtils.getActorRef(
+ leaderAddress,
+ actorSystem,
+ lookupTimeout);
+ ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID);
+
+ Future<Object> archiveFuture = jobManagerGateway.ask(
+ JobManagerMessages.getRequestArchive(),
+ timeout);
+
+ ActorRef archive = ((JobManagerMessages.ResponseArchive) Await.result(
+ archiveFuture,
+ timeout)).actor();
+
+ ActorGateway archiveGateway = new AkkaActorGateway(archive, leaderSessionID);
+
+ updateHandler(jobManagerGateway, archiveGateway);
+ } catch (Exception e) {
+ handleError(e);
+ }
+ }
+ }
+
+ @Override
+ public void handleError(Exception exception) {
+ LOG.error("Received error from LeaderRetrievalService.", exception);
+
+ try{
+ // stop the whole web server
+ stop();
+ } catch (Exception e) {
+ LOG.error("Error while stopping the web server due to a LeaderRetrievalService error.", e);
+ }
+ }
+
+ /**
+ * Updates the Flink handlers with the current leading JobManager and archive
+ *
+ * @param jobManager ActorGateway to the current JobManager leader
+ * @param archive ActorGateway to the current archive of the leading JobManager
+ * @throws Exception
+ */
+ private void updateHandler(ActorGateway jobManager, ActorGateway archive) throws Exception {
+ // ----- the handlers for the servlets -----
+ ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ servletContext.setContextPath("/");
+ servletContext.addServlet(
+ new ServletHolder(
+ new JobManagerInfoServlet(
+ jobManager,
+ archive,
+ timeout)),
+ "/jobsInfo");
+ servletContext.addServlet(
+ new ServletHolder(
+ new LogfileInfoServlet(
+ logDirFiles)),
+ "/logInfo");
+ servletContext.addServlet(
+ new ServletHolder(
+ new SetupInfoServlet(
+ config,
+ jobManager,
+ timeout)),
+ "/setupInfo");
+ servletContext.addServlet(
+ new ServletHolder(
+ new MenuServlet()),
+ "/menu");
+
+ // replace old handlers with new ones
+ handlers.setHandlers(new Handler[]{resourceHandler, servletContext});
+
+ // start new handler
+ servletContext.start();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderContender.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderContender.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderContender.java
new file mode 100644
index 0000000..dcf0a4e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderContender.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import java.util.UUID;
+
+/**
+ * Interface which has to be implemented to take part in the leader election process of the
+ * {@link LeaderElectionService}.
+ */
+public interface LeaderContender {
+
+ /**
+ * Callback method which is called by the {@link LeaderElectionService} upon selecting this
+ * instance as the new leader. The method is called with the new leader session ID.
+ *
+ * @param leaderSessionID New leader session ID
+ */
+ void grantLeadership(UUID leaderSessionID);
+
+ /**
+ * Callback method which is called by the {@link LeaderElectionService} upon revoking the
+ * leadership of a former leader. This might happen in case that multiple contenders have
+ * been granted leadership.
+ */
+ void revokeLeadership();
+
+ /**
+ * Returns the address of the {@link LeaderContender} under which other instances can connect
+ * to it.
+ *
+ * @return Address of this contender.
+ */
+ String getAddress();
+
+ /**
+ * Callback method which is called by {@link LeaderElectionService} in case of an error in the
+ * service thread.
+ *
+ * @param exception Caught exception
+ */
+ void handleError(Exception exception);
+}