You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/08 20:31:12 UTC
[5/9] flink git commit: [FLINK-2288] [runtime] Cleanups and comments
for ZooKeeper based initialization
[FLINK-2288] [runtime] Cleanups and comments for ZooKeeper based initialization
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/535475c2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/535475c2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/535475c2
Branch: refs/heads/master
Commit: 535475c2131b9bc352bc7268f022a1bdce206f2e
Parents: 9c0dd97
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 8 15:02:33 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 8 20:28:40 2015 +0200
----------------------------------------------------------------------
docs/setup/jobmanager_high_availability.md | 2 +-
flink-dist/src/main/resources/flink-conf.yaml | 37 +++++++++++++
.../flink/runtime/security/SecurityUtils.java | 10 +++-
.../zookeeper/FlinkZooKeeperQuorumPeer.java | 26 +++++----
.../flink/runtime/jobmanager/JobManager.scala | 58 +++++++++++---------
5 files changed, 92 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/docs/setup/jobmanager_high_availability.md
----------------------------------------------------------------------
diff --git a/docs/setup/jobmanager_high_availability.md b/docs/setup/jobmanager_high_availability.md
index dec0cdc..8958e17 100644
--- a/docs/setup/jobmanager_high_availability.md
+++ b/docs/setup/jobmanager_high_availability.md
@@ -77,7 +77,7 @@ server.X=addressX:peerPort:leaderPort
server.Y=addressY:peerPort:leaderPort
</pre>
-The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each of the configured hosts. The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes sure to set some rqeuired configuration values for convenience. In production setups, it is recommended to manage your own ZooKeeper installation.
+The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each of the configured hosts. The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes sure to set some required configuration values for convenience. In production setups, it is recommended to manage your own ZooKeeper installation.
## Example: Start and stop a local HA-cluster with 2 JobManagers
http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index a258815..2a287dc 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -21,18 +21,38 @@
# Common
#==============================================================================
+# The host on which the JobManager runs. Only used in non-high-availability mode.
+# The JobManager process will use this hostname to bind the listening servers to.
+# The TaskManagers will try to connect to the JobManager on that host.
+
jobmanager.rpc.address: localhost
+
+# The port where the JobManager's main actor system listens for messages.
+
jobmanager.rpc.port: 6123
+
+# The heap size for the JobManager JVM
+
jobmanager.heap.mb: 256
+
+# The heap size for the TaskManager JVM
+
taskmanager.heap.mb: 512
+
+# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
+
taskmanager.numberOfTaskSlots: 1
+
+# The parallelism used for programs that did not specify and other parallelism.
+
parallelism.default: 1
+
#==============================================================================
# Web Frontend
#==============================================================================
@@ -42,11 +62,13 @@ parallelism.default: 1
jobmanager.web.port: 8081
+
# The port uder which the standalone web client
# (for job upload and submit) listens.
webclient.port: 8080
+
#==============================================================================
# Streaming state checkpointing
#==============================================================================
@@ -58,12 +80,14 @@ webclient.port: 8080
state.backend: jobmanager
+
# Directory for storing checkpoints in a flink supported filesystem
# Note: State backend must be accessible from the JobManager, use file://
# only for local setups.
#
# state.backend.fs.checkpointdir: hdfs://checkpoints
+
#==============================================================================
# Advanced
#==============================================================================
@@ -72,6 +96,7 @@ state.backend: jobmanager
#
# taskmanager.network.numberOfBuffers: 2048
+
# Directories for temporary files.
#
# Add a delimited list for multiple directories, using the system directory
@@ -88,6 +113,7 @@ state.backend: jobmanager
#
# taskmanager.tmp.dirs: /tmp
+
# Path to the Hadoop configuration directory.
#
# This configuration is used when writing into HDFS. Unless specified otherwise,
@@ -98,3 +124,14 @@ state.backend: jobmanager
# via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'.
#
# fs.hdfs.hadoopconf: /path/to/hadoop/conf/
+
+
+#==============================================================================
+# High Availability
+#==============================================================================
+
+# The list of ZooKepper quorum peers that coordinate the high-availability
+# setup. This must be a list of the form
+# "host_1[:peerPort[:leaderPort]],host_2[:peerPort[:leaderPort]],..."
+
+#ha.zookeeper.quorum: localhost
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index 5fde51e..4c0d49a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -24,6 +24,12 @@ import org.slf4j.LoggerFactory;
import java.security.PrivilegedExceptionAction;
+/**
+ * A utility class that lets program code run in a security context provided by the
+ * Hadoop security user groups.
+ *
+ * The secure context will for example pick up authentication information from Kerberos.
+ */
public class SecurityUtils {
private static final Logger LOG = LoggerFactory.getLogger(SecurityUtils.class);
@@ -44,17 +50,15 @@ public class SecurityUtils {
LOG.error("Security is enabled but no Kerberos credentials have been found. " +
"You may authenticate using the kinit command.");
}
- T ret = ugi.doAs(new PrivilegedExceptionAction<T>() {
+ return ugi.doAs(new PrivilegedExceptionAction<T>() {
@Override
public T run() throws Exception {
return runner.run();
}
});
- return ret;
}
public static interface FlinkSecuredRunner<T> {
public T run() throws Exception;
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
index d0e706e..c9d3ec4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
@@ -21,20 +21,22 @@ package org.apache.flink.runtime.zookeeper;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
-import java.io.PrintWriter;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
@@ -49,6 +51,10 @@ public class FlinkZooKeeperQuorumPeer {
public static void main(String[] args) {
try {
+ // startup checks and logging
+ EnvironmentInformation.logEnvironmentInfo(LOG, "ZooKeeper Quorum Peer", args);
+ EnvironmentInformation.checkJavaVersion();
+
final ParameterTool params = ParameterTool.fromArgs(args);
final String zkConfigFile = params.getRequired("zkConfigFile");
final int peerId = params.getInt("peerId");
@@ -57,7 +63,7 @@ public class FlinkZooKeeperQuorumPeer {
runFlinkZkQuorumPeer(zkConfigFile, peerId);
}
catch (Throwable t) {
- t.printStackTrace();
+ LOG.error("Error running ZooKeeper quorum peer: " + t.getMessage(), t);
System.exit(-1);
}
}
@@ -210,18 +216,16 @@ public class FlinkZooKeeperQuorumPeer {
dataDir.deleteOnExit();
- // Write myid to file
- PrintWriter writer = null;
+ LOG.info("Writing {} to myid file in 'dataDir'.", id);
+
+ // Write myid to file. We use a File Writer, because that properly propagates errors,
+ // while the PrintWriter swallows errors
+ FileWriter writer = new FileWriter(new File(dataDir, "myid"));
try {
- LOG.info("Writing {} to myid file in 'dataDir'.", id);
-
- writer = new PrintWriter(new File(dataDir, "myid"));
- writer.println(id);
+ writer.write(String.valueOf(id));
}
finally {
- if (writer != null) {
- writer.close();
- }
+ writer.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0d96edb..dc1599a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1000,35 +1000,41 @@ object JobManager {
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..")
}
- // HA mode
- val (hostname, port) = if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
- // TODO @removeme @tillrohrmann This is the place where the host and random port for JM is
- // chosen. For the FlinkMiniCluster you have to choose it on your own.
- LOG.info("HA mode.")
-
- if (config.getHost == null) {
- throw new Exception("Missing parameter '--host'.")
+ // high availability mode
+ val (hostname: String, port: Int ) =
+ if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
+ // TODO @removeme @tillrohrmann This is the place where the host and random port for JM is
+ // chosen. For the FlinkMiniCluster you have to choose it on your own.
+ LOG.info("Starting JobManager in High-Availability Mode")
+
+ if (config.getHost() == null) {
+ throw new Exception("Missing parameter '--host'. Parameter is required when " +
+ "running in high-availability mode")
+ }
+
+ // Let web server listen on random port
+ configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0)
+
+ (config.getHost(), 0)
}
-
- // Let web server listen on random port
- configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-
- (config.getHost, NetUtils.getAvailablePort)
- }
- else {
- if (config.getHost != null) {
- throw new IllegalStateException("Specified explicit address for JobManager communication " +
- "via CLI, but no ZooKeeper quorum has been configured. The task managers will not be " +
- "able to find the correct JobManager to connect to. Please configure ZooKeeper or " +
- "don't set the address explicitly (this will fallback to the address configured in " +
- "in 'conf/flink-conf.yaml'.")
+ else {
+ LOG.info("Staring JobManager without high-availability")
+
+ if (config.getHost() != null) {
+ throw new Exception("Found an explicit address for JobManager communication " +
+ "via the CLI option '--host'.\n" +
+ "This parameter must only be set if the JobManager is started in high-availability " +
+ "mode and connects to a ZooKeeper quorum.\n" +
+ "Please configure ZooKeeper or don't set the '--host' option, so that the JobManager " +
+ "uses the address configured under 'conf/flink-conf.yaml'.")
+ }
+
+ val host = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+ val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+ (host, port)
}
- (configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null),
- configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT))
- }
-
(configuration, config.getJobManagerMode(), config.getStreamingMode(), hostname, port)
}