You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/08 20:31:12 UTC

[5/9] flink git commit: [FLINK-2288] [runtime] Cleanups and comments for ZooKeeper based initialization

[FLINK-2288] [runtime] Cleanups and comments for ZooKeeper based initialization


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/535475c2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/535475c2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/535475c2

Branch: refs/heads/master
Commit: 535475c2131b9bc352bc7268f022a1bdce206f2e
Parents: 9c0dd97
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 8 15:02:33 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 8 20:28:40 2015 +0200

----------------------------------------------------------------------
 docs/setup/jobmanager_high_availability.md      |  2 +-
 flink-dist/src/main/resources/flink-conf.yaml   | 37 +++++++++++++
 .../flink/runtime/security/SecurityUtils.java   | 10 +++-
 .../zookeeper/FlinkZooKeeperQuorumPeer.java     | 26 +++++----
 .../flink/runtime/jobmanager/JobManager.scala   | 58 +++++++++++---------
 5 files changed, 92 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/docs/setup/jobmanager_high_availability.md
----------------------------------------------------------------------
diff --git a/docs/setup/jobmanager_high_availability.md b/docs/setup/jobmanager_high_availability.md
index dec0cdc..8958e17 100644
--- a/docs/setup/jobmanager_high_availability.md
+++ b/docs/setup/jobmanager_high_availability.md
@@ -77,7 +77,7 @@ server.X=addressX:peerPort:leaderPort
 server.Y=addressY:peerPort:leaderPort
 </pre>
 
-The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each of the configured hosts. The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes sure to set some rqeuired configuration values for convenience. In production setups, it is recommended to manage your own ZooKeeper installation.
+The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each of the configured hosts. The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes sure to set some required configuration values for convenience. In production setups, it is recommended to manage your own ZooKeeper installation.
 
 ## Example: Start and stop a local HA-cluster with 2 JobManagers
 

http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index a258815..2a287dc 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -21,18 +21,38 @@
 # Common
 #==============================================================================
 
+# The host on which the JobManager runs. Only used in non-high-availability mode.
+# The JobManager process will use this hostname to bind the listening servers to.
+# The TaskManagers will try to connect to the JobManager on that host.
+
 jobmanager.rpc.address: localhost
 
+
+# The port where the JobManager's main actor system listens for messages.
+
 jobmanager.rpc.port: 6123
 
+
+# The heap size for the JobManager JVM
+
 jobmanager.heap.mb: 256
 
+
+# The heap size for the TaskManager JVM
+
 taskmanager.heap.mb: 512
 
+
+# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
+
 taskmanager.numberOfTaskSlots: 1
 
+
+# The parallelism used for programs that did not specify and other parallelism.
+
 parallelism.default: 1
 
+
 #==============================================================================
 # Web Frontend
 #==============================================================================
@@ -42,11 +62,13 @@ parallelism.default: 1
 
 jobmanager.web.port: 8081
 
+
 # The port uder which the standalone web client
 # (for job upload and submit) listens.
 
 webclient.port: 8080
 
+
 #==============================================================================
 # Streaming state checkpointing
 #==============================================================================
@@ -58,12 +80,14 @@ webclient.port: 8080
 
 state.backend: jobmanager
 
+
 # Directory for storing checkpoints in a flink supported filesystem
 # Note: State backend must be accessible from the JobManager, use file://
 # only for local setups. 
 #
 # state.backend.fs.checkpointdir: hdfs://checkpoints
 
+
 #==============================================================================
 # Advanced
 #==============================================================================
@@ -72,6 +96,7 @@ state.backend: jobmanager
 #
 # taskmanager.network.numberOfBuffers: 2048
 
+
 # Directories for temporary files.
 #
 # Add a delimited list for multiple directories, using the system directory
@@ -88,6 +113,7 @@ state.backend: jobmanager
 #
 # taskmanager.tmp.dirs: /tmp
 
+
 # Path to the Hadoop configuration directory.
 #
 # This configuration is used when writing into HDFS. Unless specified otherwise,
@@ -98,3 +124,14 @@ state.backend: jobmanager
 # via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'.
 #
 # fs.hdfs.hadoopconf: /path/to/hadoop/conf/
+
+
+#==============================================================================
+# High Availability
+#==============================================================================
+
+# The list of ZooKepper quorum peers that coordinate the high-availability
+# setup. This must be a list of the form
+# "host_1[:peerPort[:leaderPort]],host_2[:peerPort[:leaderPort]],..."
+
+#ha.zookeeper.quorum: localhost
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index 5fde51e..4c0d49a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -24,6 +24,12 @@ import org.slf4j.LoggerFactory;
 
 import java.security.PrivilegedExceptionAction;
 
+/**
+ * A utility class that lets program code run in a security context provided by the
+ * Hadoop security user groups.
+ * 
+ * The secure context will for example pick up authentication information from Kerberos.
+ */
 public class SecurityUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SecurityUtils.class);
@@ -44,17 +50,15 @@ public class SecurityUtils {
 			LOG.error("Security is enabled but no Kerberos credentials have been found. " +
 						"You may authenticate using the kinit command.");
 		}
-		T ret = ugi.doAs(new PrivilegedExceptionAction<T>() {
+		return ugi.doAs(new PrivilegedExceptionAction<T>() {
 			@Override
 			public T run() throws Exception {
 				return runner.run();
 			}
 		});
-		return ret;
 	}
 
 	public static interface FlinkSecuredRunner<T> {
 		public T run() throws Exception;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
index d0e706e..c9d3ec4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
@@ -21,20 +21,22 @@ package org.apache.flink.runtime.zookeeper;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.zookeeper.server.ServerConfig;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.PrintWriter;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
@@ -49,6 +51,10 @@ public class FlinkZooKeeperQuorumPeer {
 
 	public static void main(String[] args) {
 		try {
+			// startup checks and logging
+			EnvironmentInformation.logEnvironmentInfo(LOG, "ZooKeeper Quorum Peer", args);
+			EnvironmentInformation.checkJavaVersion();
+			
 			final ParameterTool params = ParameterTool.fromArgs(args);
 			final String zkConfigFile = params.getRequired("zkConfigFile");
 			final int peerId = params.getInt("peerId");
@@ -57,7 +63,7 @@ public class FlinkZooKeeperQuorumPeer {
 			runFlinkZkQuorumPeer(zkConfigFile, peerId);
 		}
 		catch (Throwable t) {
-			t.printStackTrace();
+			LOG.error("Error running ZooKeeper quorum peer: " + t.getMessage(), t);
 			System.exit(-1);
 		}
 	}
@@ -210,18 +216,16 @@ public class FlinkZooKeeperQuorumPeer {
 
 		dataDir.deleteOnExit();
 
-		// Write myid to file
-		PrintWriter writer = null;
+		LOG.info("Writing {} to myid file in 'dataDir'.", id);
+		
+		// Write myid to file. We use a File Writer, because that properly propagates errors,
+		// while the PrintWriter swallows errors
+		FileWriter writer = new FileWriter(new File(dataDir, "myid"));
 		try {
-			LOG.info("Writing {} to myid file in 'dataDir'.", id);
-
-			writer = new PrintWriter(new File(dataDir, "myid"));
-			writer.println(id);
+			writer.write(String.valueOf(id));
 		}
 		finally {
-			if (writer != null) {
-				writer.close();
-			}
+			writer.close();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0d96edb..dc1599a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1000,35 +1000,41 @@ object JobManager {
       configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..")
     }
 
-    // HA mode
-    val (hostname, port) = if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
-      // TODO @removeme @tillrohrmann This is the place where the host and random port for JM is
-      // chosen.  For the FlinkMiniCluster you have to choose it on your own.
-      LOG.info("HA mode.")
-
-      if (config.getHost == null) {
-        throw new Exception("Missing parameter '--host'.")
+    // high availability mode
+    val (hostname: String, port: Int ) = 
+      if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
+        // TODO @removeme @tillrohrmann This is the place where the host and random port for JM is
+        // chosen.  For the FlinkMiniCluster you have to choose it on your own.
+        LOG.info("Starting JobManager in High-Availability Mode")
+  
+        if (config.getHost() == null) {
+          throw new Exception("Missing parameter '--host'. Parameter is required when " +
+            "running in high-availability mode")
+        }
+  
+        // Let web server listen on random port
+        configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0)
+  
+        (config.getHost(), 0)
       }
-
-      // Let web server listen on random port
-      configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-
-      (config.getHost, NetUtils.getAvailablePort)
-    }
-    else {
-      if (config.getHost != null) {
-        throw new IllegalStateException("Specified explicit address for JobManager communication " +
-          "via CLI, but no ZooKeeper quorum has been configured. The task managers will not be " +
-          "able to find the correct JobManager to connect to. Please configure ZooKeeper or " +
-          "don't set the address explicitly (this will fallback to the address configured in " +
-          "in 'conf/flink-conf.yaml'.")
+      else {
+        LOG.info("Staring JobManager without high-availability")
+        
+        if (config.getHost() != null) {
+          throw new Exception("Found an explicit address for JobManager communication " +
+            "via the CLI option '--host'.\n" +
+            "This parameter must only be set if the JobManager is started in high-availability " +
+            "mode and connects to a ZooKeeper quorum.\n" +
+            "Please configure ZooKeeper or don't set the '--host' option, so that the JobManager " +
+            "uses the address configured under 'conf/flink-conf.yaml'.")
+        }
+  
+        val host = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+        val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+            ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+        (host, port)
       }
 
-      (configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null),
-        configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-          ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT))
-    }
-
     (configuration, config.getJobManagerMode(), config.getStreamingMode(), hostname, port)
   }