You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/01 10:06:34 UTC

[01/16] flink git commit: [FLINK-6498] Migrate Zookeeper configuration options

Repository: flink
Updated Branches:
  refs/heads/master daed46002 -> 6e3f839ac


[FLINK-6498] Migrate Zookeeper configuration options

This closes #4123.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f8390181
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f8390181
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f8390181

Branch: refs/heads/master
Commit: f839018131024860a1b25b13cea7e1313add28d5
Parents: 3d2f3f6
Author: zjureel <zj...@gmail.com>
Authored: Wed Jun 14 17:38:25 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:02:07 2017 +0200

----------------------------------------------------------------------
 .../connectors/fs/RollingSinkSecuredITCase.java |   2 +-
 .../flink/configuration/ConfigConstants.java    | 124 +++++++++++++++++--
 .../configuration/HighAvailabilityOptions.java  |  60 +++++++--
 .../services/MesosServicesUtils.java            |  10 +-
 .../flink/runtime/util/ZooKeeperUtils.java      |  53 +++-----
 .../ZooKeeperLeaderElectionTest.java            |   5 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |   7 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   6 +-
 8 files changed, 191 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index 6bd75d4..866b2f3 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -218,7 +218,7 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
 			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
 			config.setString(CoreOptions.STATE_BACKEND, "filesystem");
-			config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
+			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
 			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
 			config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 35d3d13..d467dfa 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -944,65 +944,111 @@ public final class ConfigConstants {
 
 	// --------------------------- ZooKeeper ----------------------------------
 
-	/** ZooKeeper servers. */
+	/**
+	 * ZooKeeper servers.
+	 * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_QUORUM}.
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_QUORUM_KEY = "high-availability.zookeeper.quorum";
 
 	/**
 	 * File system state backend base path for recoverable state handles. Recovery state is written
 	 * to this path and the file state handles are persisted for recovery.
+	 * @deprecated in favor of {@link HighAvailabilityOptions#HA_STORAGE_PATH}.
 	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_STORAGE_PATH = "high-availability.zookeeper.storageDir";
 
-	/** ZooKeeper root path. */
+	/**
+	 * ZooKeeper root path.
+	 * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_DIR_KEY = "high-availability.zookeeper.path.root";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_NAMESPACE_KEY = "high-availability.zookeeper.path.namespace";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_LATCH_PATH = "high-availability.zookeeper.path.latch";
 
-	/** ZooKeeper root path (ZNode) for job graphs. */
+	/**
+	 * ZooKeeper root path (ZNode) for job graphs.
+	 * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_JOBGRAPHS_PATH}.
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH = "high-availability.zookeeper.path.jobgraphs";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader";
 
-	/** ZooKeeper root path (ZNode) for completed checkpoints. */
+	/**
+	 * ZooKeeper root path (ZNode) for completed checkpoints.
+	 * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}.
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH = "high-availability.zookeeper.path.checkpoints";
 
-	/** ZooKeeper root path (ZNode) for checkpoint counters. */
+	/**
+	 * ZooKeeper root path (ZNode) for checkpoint counters.
+	 * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}.
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "high-availability.zookeeper.path.checkpoint-counter";
 
-	/** ZooKeeper root path (ZNode) for Mesos workers. */
+	/**
+	 * ZooKeeper root path (ZNode) for Mesos workers.
+	 * @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_MESOS_WORKERS_PATH}.
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_MESOS_WORKERS_PATH = "high-availability.zookeeper.path.mesos-workers";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_SESSION_TIMEOUT}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_SESSION_TIMEOUT = "high-availability.zookeeper.client.session-timeout";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CONNECTION_TIMEOUT}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_CONNECTION_TIMEOUT = "high-availability.zookeeper.client.connection-timeout";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_RETRY_WAIT} */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_RETRY_WAIT = "high-availability.zookeeper.client.retry-wait";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_MAX_RETRY_ATTEMPTS}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS = "high-availability.zookeeper.client.max-retry-attempts";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CLIENT_ACL}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String HA_ZOOKEEPER_CLIENT_ACL = "high-availability.zookeeper.client.acl";
 
+	/** @deprecated in favor of {@link SecurityOptions#ZOOKEEPER_SASL_DISABLE}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String ZOOKEEPER_SASL_DISABLE = "zookeeper.sasl.disable";
 
+	/** @deprecated in favor of {@link SecurityOptions#ZOOKEEPER_SASL_SERVICE_NAME}. */
 	@PublicEvolving
+	@Deprecated
 	public static final String ZOOKEEPER_SASL_SERVICE_NAME = "zookeeper.sasl.service-name";
 
 	/** @deprecated Deprecated in favour of {@link #HA_ZOOKEEPER_QUORUM_KEY}. */
@@ -1632,51 +1678,103 @@ public final class ConfigConstants {
 
 	// --------------------------- ZooKeeper ----------------------------------
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}. */
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_DIR_KEY = "/flink";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE}. */
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_NAMESPACE_KEY = "/default";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_LATCH_PATH = "/leaderlatch";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_JOBGRAPHS_PATH}. */
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH = "/jobgraphs";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}. */
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH = "/checkpoints";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH} */
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_MESOS_WORKERS_PATH}. */
+	@Deprecated
 	public static final String DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH = "/mesos-workers";
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_SESSION_TIMEOUT}. */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 60000;
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CONNECTION_TIMEOUT}. */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT = 15000;
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_RETRY_WAIT}. */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_RETRY_WAIT = 5000;
 
+	/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_MAX_RETRY_ATTEMPTS}. */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS = 3;
 
 	// - Defaults for required ZooKeeper configuration keys -------------------
 
-	/** ZooKeeper default client port. */
+	/**
+	 * ZooKeeper default client port.
+	 * @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_CLIENT_PORT}.
+	 */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;
 
-	/** ZooKeeper default init limit. */
+	/**
+	 * ZooKeeper default init limit.
+	 * @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_INIT_LIMIT}.
+	 */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
 
-	/** ZooKeeper default sync limit. */
+	/**
+	 * ZooKeeper default sync limit.
+	 * @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_SYNC_LIMIT}.
+	 */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5;
 
-	/** ZooKeeper default peer port. */
+	/**
+	 * ZooKeeper default peer port.
+	 * @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_PEER_PORT}.
+	 */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_PEER_PORT = 2888;
 
-	/** ZooKeeper default leader port. */
+	/**
+	 * ZooKeeper default leader port.
+	 * @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_LEADER_PORT}.
+	 */
+	@Deprecated
 	public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;
 
-	/** Defaults for ZK client security **/
+	/**
+	 * Defaults for ZK client security.
+	 * @deprecated in favor of {@link SecurityOptions#ZOOKEEPER_SASL_DISABLE}.
+	 */
+	@Deprecated
 	public static final boolean DEFAULT_ZOOKEEPER_SASL_DISABLE = true;
 
-	/** ACL options supported "creator" or "open" */
+	/**
+	 * ACL options supported "creator" or "open".
+	 * @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CLIENT_ACL}.
+	 */
+	@Deprecated
 	public static final String DEFAULT_HA_ZOOKEEPER_CLIENT_ACL = "open";
 
 	// ----------------------------- Metrics ----------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 2cfb25a..2b026b9 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -58,14 +58,6 @@ public class HighAvailabilityOptions {
 			key("high-availability.storageDir")
 			.noDefaultValue()
 			.withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir");
-
-	/**
-	 * The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
-	 */
-	public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
-			key("high-availability.zookeeper.quorum")
-			.noDefaultValue()
-			.withDeprecatedKeys("recovery.zookeeper.quorum");
 	
 
 	// ------------------------------------------------------------------------
@@ -93,6 +85,14 @@ public class HighAvailabilityOptions {
 	// ------------------------------------------------------------------------
 
 	/**
+	 * The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
+	 */
+	public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
+			key("high-availability.zookeeper.quorum")
+			.noDefaultValue()
+			.withDeprecatedKeys("recovery.zookeeper.quorum");
+
+	/**
 	 * The root path under which Flink stores its entries in ZooKeeper
 	 */
 	public static final ConfigOption<String> HA_ZOOKEEPER_ROOT =
@@ -100,6 +100,46 @@ public class HighAvailabilityOptions {
 			.defaultValue("/flink")
 			.withDeprecatedKeys("recovery.zookeeper.path.root");
 
+	public static final ConfigOption<String> HA_ZOOKEEPER_NAMESPACE =
+			key("high-availability.zookeeper.path.namespace")
+			.noDefaultValue()
+			.withDeprecatedKeys("recovery.zookeeper.path.namespace");
+
+	public static final ConfigOption<String> HA_ZOOKEEPER_LATCH_PATH =
+			key("high-availability.zookeeper.path.latch")
+			.defaultValue("/leaderlatch")
+			.withDeprecatedKeys("recovery.zookeeper.path.latch");
+
+	/** ZooKeeper root path (ZNode) for job graphs. */
+	public static final ConfigOption<String> HA_ZOOKEEPER_JOBGRAPHS_PATH =
+			key("high-availability.zookeeper.path.jobgraphs")
+			.defaultValue("/jobgraphs")
+			.withDeprecatedKeys("recovery.zookeeper.path.jobgraphs");
+
+	public static final ConfigOption<String> HA_ZOOKEEPER_LEADER_PATH =
+			key("high-availability.zookeeper.path.leader")
+			.defaultValue("/leader")
+			.withDeprecatedKeys("recovery.zookeeper.path.leader");
+
+	/** ZooKeeper root path (ZNode) for completed checkpoints. */
+	public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINTS_PATH =
+			key("high-availability.zookeeper.path.checkpoints")
+			.defaultValue("/checkpoints")
+			.withDeprecatedKeys("recovery.zookeeper.path.checkpoints");
+
+	/** ZooKeeper root path (ZNode) for checkpoint counters. */
+	public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
+			key("high-availability.zookeeper.path.checkpoint-counter")
+			.defaultValue("/checkpoint-counter")
+			.withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter");
+
+	/** ZooKeeper root path (ZNode) for Mesos workers. */
+	@PublicEvolving
+	public static final ConfigOption<String> HA_ZOOKEEPER_MESOS_WORKERS_PATH =
+			key("high-availability.zookeeper.path.mesos-workers")
+			.defaultValue("/mesos-workers")
+			.withDeprecatedKeys("recovery.zookeeper.path.mesos-workers");
+
 	// ------------------------------------------------------------------------
 	//  ZooKeeper Client Settings
 	// ------------------------------------------------------------------------
@@ -128,6 +168,10 @@ public class HighAvailabilityOptions {
 			key("high-availability.zookeeper.path.running-registry")
 			.defaultValue("/running_job_registry/");
 
+	public static final ConfigOption<String> ZOOKEEPER_CLIENT_ACL =
+			key("high-availability.zookeeper.client.acl")
+			.defaultValue("open");
+
 	// ------------------------------------------------------------------------
 
 	/** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
index a28020a..370a760 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.mesos.runtime.clusterframework.services;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
-import org.apache.flink.util.ConfigurationUtil;
 
 /**
  * Utilities for the {@link MesosServices}.
@@ -44,11 +43,8 @@ public class MesosServicesUtils {
 				return new StandaloneMesosServices();
 
 			case ZOOKEEPER:
-				final String zkMesosRootPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-					configuration,
-					ConfigConstants.HA_ZOOKEEPER_MESOS_WORKERS_PATH,
-					ConfigConstants.DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH,
-					ConfigConstants.ZOOKEEPER_MESOS_WORKERS_PATH);
+				final String zkMesosRootPath = configuration.getString(
+					HighAvailabilityOptions.HA_ZOOKEEPER_MESOS_WORKERS_PATH);
 
 				ZooKeeperUtilityFactory zooKeeperUtilityFactory = new ZooKeeperUtilityFactory(
 					configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 9ade5ec..a7ac500 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -25,10 +25,10 @@ import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.imps.DefaultACLProvider;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
-import org.apache.flink.util.ConfigurationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
@@ -87,8 +86,7 @@ public class ZooKeeperUtils {
 
 		String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
 
-		boolean disableSaslClient = configuration.getBoolean(ConfigConstants.ZOOKEEPER_SASL_DISABLE,
-				ConfigConstants.DEFAULT_ZOOKEEPER_SASL_DISABLE);
+		boolean disableSaslClient = configuration.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
 
 		ACLProvider aclProvider;
 
@@ -96,7 +94,7 @@ public class ZooKeeperUtils {
 
 		if(disableSaslClient && aclMode == ZkClientACLMode.CREATOR) {
 			String errorMessage = "Cannot set ACL role to " + aclMode +"  since SASL authentication is " +
-					"disabled through the " + ConfigConstants.ZOOKEEPER_SASL_DISABLE + " property";
+					"disabled through the " + SecurityOptions.ZOOKEEPER_SASL_DISABLE.key() + " property";
 			LOG.warn(errorMessage);
 			throw new IllegalConfigurationException(errorMessage);
 		}
@@ -185,11 +183,8 @@ public class ZooKeeperUtils {
 		final Configuration configuration,
 		final String pathSuffix)
 	{
-		String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-			configuration,
-			ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
-			ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix;
+		String leaderPath = configuration.getString(
+			HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;
 
 		return new ZooKeeperLeaderRetrievalService(client, leaderPath);
 	}
@@ -221,16 +216,10 @@ public class ZooKeeperUtils {
 		final Configuration configuration,
 		final String pathSuffix)
 	{
-		final String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-			configuration,
-			ConfigConstants.HA_ZOOKEEPER_LATCH_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH,
-			ConfigConstants.ZOOKEEPER_LATCH_PATH) + pathSuffix;
-		final String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-			configuration,
-			ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
-			ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix;
+		final String latchPath = configuration.getString(
+			HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) + pathSuffix;
+		final String leaderPath = configuration.getString(
+			HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;
 
 		return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
 	}
@@ -254,11 +243,7 @@ public class ZooKeeperUtils {
 		RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage = createFileSystemStateStorage(configuration, "submittedJobGraph");
 
 		// ZooKeeper submitted jobs root dir
-		String zooKeeperSubmittedJobsPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH,
-				ConfigConstants.ZOOKEEPER_JOBGRAPHS_PATH);
+		String zooKeeperSubmittedJobsPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		return new ZooKeeperSubmittedJobGraphStore(
 				client, zooKeeperSubmittedJobsPath, stateStorage, executor);
@@ -284,11 +269,8 @@ public class ZooKeeperUtils {
 
 		checkNotNull(configuration, "Configuration");
 
-		String checkpointsPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_CHECKPOINTS_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH,
-				ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH);
+		String checkpointsPath = configuration.getString(
+			HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH);
 
 		RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = createFileSystemStateStorage(
 			configuration,
@@ -317,11 +299,8 @@ public class ZooKeeperUtils {
 			Configuration configuration,
 			JobID jobId) {
 
-		String checkpointIdCounterPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH,
-				ConfigConstants.ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
+		String checkpointIdCounterPath = configuration.getString(
+				HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH);
 
 		checkpointIdCounterPath += ZooKeeperSubmittedJobGraphStore.getPathForJob(jobId);
 
@@ -391,11 +370,11 @@ public class ZooKeeperUtils {
 		 * Return the configured {@link ZkClientACLMode}.
 		 *
 		 * @param config The config to parse
-		 * @return Configured ACL mode or {@link ConfigConstants#DEFAULT_HA_ZOOKEEPER_CLIENT_ACL} if not
+		 * @return Configured ACL mode or the default defined by {@link HighAvailabilityOptions#ZOOKEEPER_CLIENT_ACL} if not
 		 * configured.
 		 */
 		public static ZkClientACLMode fromConfig(Configuration config) {
-			String aclMode = config.getString(ConfigConstants.HA_ZOOKEEPER_CLIENT_ACL, null);
+			String aclMode = config.getString(HighAvailabilityOptions.ZOOKEEPER_CLIENT_ACL);
 			if (aclMode == null || aclMode.equalsIgnoreCase(ZkClientACLMode.OPEN.name())) {
 				return ZkClientACLMode.OPEN;
 			} else if (aclMode.equalsIgnoreCase(ZkClientACLMode.CREATOR.name())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 6efd270..73cf063 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -298,7 +298,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 		final String FAULTY_CONTENDER_URL = "faultyContender";
 		final String leaderPath = "/leader";
 
-		configuration.setString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH, leaderPath);
 
 		ZooKeeperLeaderElectionService leaderElectionService = null;
 		ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
@@ -463,8 +463,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 			testingContender = new TestingContender(TEST_URL, leaderElectionService);
 			listener = new TestingListener();
 
-			final String leaderPath = configuration.getString(ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
-					ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH);
+			final String leaderPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH);
 			cache = new NodeCache(client2, leaderPath);
 
 			ExistsCacheListener existsListener = new ExistsCacheListener(cache);

http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index 80b8e18..3f2eea3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -392,8 +393,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 
 		// ZooKeeper
 		String currentJobsPath = config.getString(
-				ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+				HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
 
@@ -424,8 +424,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 
 		// ZooKeeper
 		String currentJobsPath = config.getString(
-			ConfigConstants.HA_ZOOKEEPER_JOBGRAPHS_PATH,
-			ConfigConstants.DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH);
+			HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
 
 		Stat stat = ZooKeeper.getClient().checkExists().forPath(currentJobsPath);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f8390181/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index f15314a..73279da 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -66,7 +66,7 @@ import java.util.Properties;
 import java.util.concurrent.Callable;
 
 import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
-import static org.apache.flink.configuration.ConfigConstants.HA_ZOOKEEPER_NAMESPACE_KEY;
+import static org.apache.flink.configuration.HighAvailabilityOptions.HA_ZOOKEEPER_NAMESPACE;
 
 /**
  * Class handling the command line interface to the YARN session.
@@ -597,9 +597,9 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ?
 									cmd.getOptionValue(zookeeperNamespace.getOpt())
 									: yarnDescriptor.getFlinkConfiguration()
-									.getString(HA_ZOOKEEPER_NAMESPACE_KEY, cmd.getOptionValue(applicationId.getOpt()));
+									.getString(HA_ZOOKEEPER_NAMESPACE, cmd.getOptionValue(applicationId.getOpt()));
 			LOG.info("Going to use the ZK namespace: {}", zkNamespace);
-			yarnDescriptor.getFlinkConfiguration().setString(HA_ZOOKEEPER_NAMESPACE_KEY, zkNamespace);
+			yarnDescriptor.getFlinkConfiguration().setString(HA_ZOOKEEPER_NAMESPACE, zkNamespace);
 
 			try {
 				yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));


[14/16] flink git commit: [FLINK-7030] Build with scala-2.11 by default

Posted by ch...@apache.org.
[FLINK-7030] Build with scala-2.11 by default

This closes #4209.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/caf149eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/caf149eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/caf149eb

Branch: refs/heads/master
Commit: caf149eb40c35afb6efc76052dd5c0bc0ee8b229
Parents: 90a6407
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Fri Jun 23 13:41:55 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:04:06 2017 +0200

----------------------------------------------------------------------
 .travis.yml            | 25 ++++++++++++-------------
 docs/setup/building.md | 19 +++++--------------
 pom.xml                | 14 +++++++-------
 3 files changed, 24 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/caf149eb/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 9373cbf..d30e172 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -17,33 +17,32 @@ matrix:
   include:
   # Always run test groups A and B together
     - jdk: "oraclejdk8"
-      env: PROFILE="-Dhadoop.version=2.8.0 -Dscala-2.11 -Pflink-fast-tests-a,include-kinesis,jdk8 -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.8.0 -Pflink-fast-tests-a,include-kinesis,jdk8 -Dmaven.javadoc.skip=true"
     - jdk: "oraclejdk8"
-      env: PROFILE="-Dhadoop.version=2.8.0 -Dscala-2.11 -Pflink-fast-tests-b,include-kinesis,jdk8 -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.8.0 -Pflink-fast-tests-b,include-kinesis,jdk8 -Dmaven.javadoc.skip=true"
     - jdk: "oraclejdk8"
-      env: PROFILE="-Dhadoop.version=2.8.0 -Dscala-2.11 -Pflink-fast-tests-c,include-kinesis,jdk8 -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.8.0 -Pflink-fast-tests-c,include-kinesis,jdk8 -Dmaven.javadoc.skip=true"
 
     - jdk: "oraclejdk8"
-      env: PROFILE="-Dhadoop.version=2.7.3 -Pflink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.7.3 -Dscala-2.10 -Pflink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true"
     - jdk: "oraclejdk8"
-      env: PROFILE="-Dhadoop.version=2.7.3 -Pflink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.7.3 -Dscala-2.10 -Pflink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true"
     - jdk: "oraclejdk8"
-      env: PROFILE="-Dhadoop.version=2.7.3 -Pflink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.7.3 -Dscala-2.10 -Pflink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true"
 
     - jdk: "openjdk7"
-      env: PROFILE="-Dhadoop.version=2.6.5 -Dscala-2.11 -Pflink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.6.5 -Pflink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true"
     - jdk: "openjdk7"
-      env: PROFILE="-Dhadoop.version=2.6.5 -Dscala-2.11 -Pflink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.6.5 -Pflink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true"
     - jdk: "openjdk7"
-      env: PROFILE="-Dhadoop.version=2.6.5 -Dscala-2.11 -Pflink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.6.5 -Pflink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true"
 
     - jdk: "openjdk7"
-      env: PROFILE="-Dhadoop.version=2.4.1 -Pflink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.10 -Pflink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true"
     - jdk: "openjdk7"
-      env: PROFILE="-Dhadoop.version=2.4.1 -Pflink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.10 -Pflink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true"
     - jdk: "openjdk7"
-      env: PROFILE="-Dhadoop.version=2.4.1 -Pflink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true"
-
+      env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.10 -Pflink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true"
 
 git:
   depth: 100

http://git-wip-us.apache.org/repos/asf/flink/blob/caf149eb/docs/setup/building.md
----------------------------------------------------------------------
diff --git a/docs/setup/building.md b/docs/setup/building.md
index 8a71fc6..3ef372e 100644
--- a/docs/setup/building.md
+++ b/docs/setup/building.md
@@ -115,25 +115,16 @@ The `-Pvendor-repos` activates a Maven [build profile](http://maven.apache.org/g
 
 Flink has APIs, libraries, and runtime modules written in [Scala](http://scala-lang.org). Users of the Scala API and libraries may have to match the Scala version of Flink with the Scala version of their projects (because Scala is not strictly backwards compatible).
 
-**By default, Flink is built with the Scala 2.10**. To build Flink with Scala *2.11*, you can change the default Scala *binary version* with the following script:
+**By default, Flink is built with the Scala 2.11**. To build Flink with Scala *2.10*, you can change the default Scala *binary version* by using *scala-2.10* build profile:
 
 ~~~bash
-# Switch Scala binary version between 2.10 and 2.11
-tools/change-scala-version.sh 2.11
-# Build with Scala version 2.11
-mvn clean install -DskipTests
+# Build with Scala version 2.10
+mvn clean install -DskipTests -Pscala-2.10
 ~~~
 
-To build against custom Scala versions, you need to switch to the appropriate binary version and supply the *language version* as an additional build property. For example, to build against Scala 2.11.4, you have to execute:
-
-~~~bash
-# Switch Scala binary version to 2.11
-tools/change-scala-version.sh 2.11
-# Build with custom Scala version 2.11.4
-mvn clean install -DskipTests -Dscala.version=2.11.4
-~~~
+To build against custom Scala versions, you need to define new custom build profile that will override *scala.version* and *scala.binary.version* values.
 
-Flink is developed against Scala *2.10* and tested additionally against Scala *2.11*. These two versions are known to be compatible. Earlier versions (like Scala *2.9*) are *not* compatible.
+Flink is developed against Scala *2.11* and tested additionally against Scala *2.10*. These two versions are known to be compatible. Earlier versions (like Scala *2.9*) are *not* compatible.
 
 Newer versions may be compatible, depending on breaking changes in the language features used by Flink, and the availability of Flink's dependencies in those Scala versions. The dependencies written in Scala include for example *Kafka*, *Akka*, *Scalatest*, and *scopt*.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/caf149eb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d73f681..2115f12 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,8 +100,8 @@ under the License.
 		<java.version>1.7</java.version>
 		<scala.macros.version>2.1.0</scala.macros.version>
 		<!-- Default scala versions, may be overwritten by build profiles -->
-		<scala.version>2.10.6</scala.version>
-		<scala.binary.version>2.10</scala.binary.version>
+		<scala.version>2.11.11</scala.version>
+		<scala.binary.version>2.11</scala.binary.version>
 		<chill.version>0.7.4</chill.version>
 		<asm.version>5.0.4</asm.version>
 		<zookeeper.version>3.4.6</zookeeper.version>
@@ -497,17 +497,17 @@ under the License.
 
 	<profiles>
 		
-		<!-- Profile to switch to Scala Version 2.11 -->
+		<!-- Profile to switch to Scala Version 2.10 -->
 		<profile>
-			<id>scala-2.11</id>
+			<id>scala-2.10</id>
 			<activation>
 				<property>
-					<name>scala-2.11</name>
+					<name>scala-2.10</name>
 				</property>
 			</activation>
 			<properties>
-				<scala.version>2.11.11</scala.version>
-				<scala.binary.version>2.11</scala.binary.version>
+				<scala.version>2.10.6</scala.version>
+				<scala.binary.version>2.10</scala.binary.version>
 			</properties>
 		</profile>
 


[07/16] flink git commit: [FLINK-6221] Add PrometheusReporter

Posted by ch...@apache.org.
[FLINK-6221] Add PrometheusReporter

This closes #3833.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb9505dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb9505dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb9505dd

Branch: refs/heads/master
Commit: bb9505ddee6fbf42ef24853dab8475cff9c7948d
Parents: cd45825
Author: Maximilian Bode <ma...@tngtech.com>
Authored: Sat May 6 02:49:42 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:02:08 2017 +0200

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      |  29 ++
 flink-dist/pom.xml                              |   7 +
 flink-dist/src/main/assemblies/opt.xml          |   7 +
 flink-metrics/flink-metrics-prometheus/pom.xml  | 129 +++++++++
 .../metrics/prometheus/PrometheusReporter.java  | 269 +++++++++++++++++++
 .../prometheus/PrometheusReporterTest.java      | 190 +++++++++++++
 .../src/test/resources/log4j-test.properties    |  27 ++
 flink-metrics/pom.xml                           |   1 +
 pom.xml                                         |   1 +
 9 files changed, 660 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index fc7b595..06ed9ef 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -415,6 +415,35 @@ metrics.reporter.grph.protocol: TCP
 
 {% endhighlight %}
 
+### Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter)
+
+In order to use this reporter you must copy `/opt/flink-metrics-prometheus-{{site.version}}.jar` into the `/lib` folder
+of your Flink distribution.
+
+Parameters:
+
+- `port` - (optional) the port the Prometheus exporter listens on, defaults to [9249](https://github.com/prometheus/prometheus/wiki/Default-port-allocations).
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: prom
+metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
+
+{% endhighlight %}
+
+Flink metric types are mapped to Prometheus metric types as follows: 
+
+| Flink     | Prometheus | Note                                     |
+| --------- |------------|------------------------------------------|
+| Counter   | Gauge      |Prometheus counters cannot be decremented.|
+| Gauge     | Gauge      |                                          |
+| Histogram | Summary    |Quantiles .5, .75, .95, .98, .99 and .999 |
+| Meter     | Gauge      |The gauge exports the meter's rate.       |
+
+All Flink metrics variables, such as `<host>`, `<job_name>`, `<tm_id>`, `<subtask_index>`, `<task_name>` and `<operator_name>`, are exported to Prometheus as labels. 
+
 ### StatsD (org.apache.flink.metrics.statsd.StatsDReporter)
 
 In order to use this reporter you must copy `/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/lib` folder

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 4070dbb..06a656d 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -264,6 +264,13 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-prometheus</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-metrics-statsd</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-dist/src/main/assemblies/opt.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index bb04d28..0afa1d3 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -98,6 +98,13 @@
 		</file>
 
 		<file>
+			<source>../flink-metrics/flink-metrics-prometheus/target/flink-metrics-prometheus-${project.version}-shaded.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-metrics-prometheus-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
+		<file>
 			<source>../flink-metrics/flink-metrics-statsd/target/flink-metrics-statsd-${project.version}.jar</source>
 			<outputDirectory>opt/</outputDirectory>
 			<destName>flink-metrics-statsd-${project.version}.jar</destName>

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-metrics/flink-metrics-prometheus/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/pom.xml b/flink-metrics/flink-metrics-prometheus/pom.xml
new file mode 100644
index 0000000..4884433
--- /dev/null
+++ b/flink-metrics/flink-metrics-prometheus/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-metrics</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-metrics-prometheus</artifactId>
+	<name>flink-metrics-prometheus</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-annotations</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>io.prometheus</groupId>
+			<artifactId>simpleclient</artifactId>
+			<version>${prometheus.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>io.prometheus</groupId>
+			<artifactId>simpleclient_servlet</artifactId>
+			<version>${prometheus.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.nanohttpd</groupId>
+			<artifactId>nanohttpd</artifactId>
+			<version>2.2.0</version>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.mashape.unirest</groupId>
+			<artifactId>unirest-java</artifactId>
+			<version>1.4.9</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<shadedArtifactAttached>true</shadedArtifactAttached>
+							<relocations combine.children="append">
+								<relocation>
+									<pattern>io.prometheus.client</pattern>
+									<shadedPattern>org.apache.flink.shaded.io.prometheus.client</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>fi.iki.elonen</pattern>
+									<shadedPattern>org.apache.flink.shaded.fi.iki.elonen</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
new file mode 100644
index 0000000..d23be8c
--- /dev/null
+++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus.
+ */
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+	private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporter.class);
+
+	static final String ARG_PORT = "port";
+	private static final int DEFAULT_PORT = 9249;
+
+	private static final Pattern UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]");
+	private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() {
+		@Override
+		public String filterCharacters(String input) {
+			return replaceInvalidChars(input);
+		}
+	};
+
+	private static final char SCOPE_SEPARATOR = '_';
+	private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR;
+
+	private PrometheusEndpoint prometheusEndpoint;
+	private final Map<String, Collector> collectorsByMetricName = new HashMap<>();
+
+	@VisibleForTesting
+	static String replaceInvalidChars(final String input) {
+		// https://prometheus.io/docs/instrumenting/writing_exporters/
+		// Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore.
+		return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+	}
+
+	@Override
+	public void open(MetricConfig config) {
+		int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+		LOG.info("Using port {}.", port);
+		prometheusEndpoint = new PrometheusEndpoint(port);
+		try {
+			prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);
+		} catch (IOException e) {
+			final String msg = "Could not start PrometheusEndpoint on port " + port;
+			LOG.warn(msg, e);
+			throw new RuntimeException(msg, e);
+		}
+	}
+
+	@Override
+	public void close() {
+		prometheusEndpoint.stop();
+		CollectorRegistry.defaultRegistry.clear();
+	}
+
+	@Override
+	public void notifyOfAddedMetric(final Metric metric, final String metricName, final MetricGroup group) {
+		final String scope = SCOPE_PREFIX + getLogicalScope(group);
+
+		List<String> dimensionKeys = new LinkedList<>();
+		List<String> dimensionValues = new LinkedList<>();
+		for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) {
+			final String key = dimension.getKey();
+			dimensionKeys.add(CHARACTER_FILTER.filterCharacters(key.substring(1, key.length() - 1)));
+			dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
+		}
+
+		final String validMetricName = scope + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName);
+		final String metricIdentifier = group.getMetricIdentifier(metricName);
+		final Collector collector;
+		if (metric instanceof Gauge) {
+			collector = createGauge((Gauge) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+		} else if (metric instanceof Counter) {
+			collector = createGauge((Counter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+		} else if (metric instanceof Meter) {
+			collector = createGauge((Meter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+		} else if (metric instanceof Histogram) {
+			collector = createSummary((Histogram) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+		} else {
+			LOG.warn("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
+				metric.getClass().getName());
+			return;
+		}
+		collector.register();
+		collectorsByMetricName.put(metricName, collector);
+	}
+
+	@Override
+	public void notifyOfRemovedMetric(final Metric metric, final String metricName, final MetricGroup group) {
+		CollectorRegistry.defaultRegistry.unregister(collectorsByMetricName.get(metricName));
+		collectorsByMetricName.remove(metricName);
+	}
+
+	@SuppressWarnings("unchecked")
+	private static String getLogicalScope(MetricGroup group) {
+		return ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+	}
+
+	private Collector createGauge(final Gauge gauge, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
+		return newGauge(name, identifier, labelNames, labelValues, new io.prometheus.client.Gauge.Child() {
+			@Override
+			public double get() {
+				final Object value = gauge.getValue();
+				if (value instanceof Double) {
+					return (double) value;
+				}
+				if (value instanceof Number) {
+					return ((Number) value).doubleValue();
+				} else if (value instanceof Boolean) {
+					return ((Boolean) value) ? 1 : 0;
+				} else {
+					LOG.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.",
+						gauge, value.getClass().getName());
+					return 0;
+				}
+			}
+		});
+	}
+
+	private static Collector createGauge(final Counter counter, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
+		return newGauge(name, identifier, labelNames, labelValues, new io.prometheus.client.Gauge.Child() {
+			@Override
+			public double get() {
+				return (double) counter.getCount();
+			}
+		});
+	}
+
+	private Collector createGauge(final Meter meter, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
+		return newGauge(name, identifier, labelNames, labelValues, new io.prometheus.client.Gauge.Child() {
+			@Override
+			public double get() {
+				return meter.getRate();
+			}
+		});
+	}
+
+	private static Collector newGauge(String name, String identifier, List<String> labelNames, List<String> labelValues, io.prometheus.client.Gauge.Child child) {
+		return io.prometheus.client.Gauge
+			.build()
+			.name(name)
+			.help(identifier)
+			.labelNames(toArray(labelNames))
+			.create()
+			.setChild(child, toArray(labelValues));
+	}
+
+	private static HistogramSummaryProxy createSummary(final Histogram histogram, final String name, final String identifier, final List<String> dimensionKeys, final List<String> dimensionValues) {
+		return new HistogramSummaryProxy(histogram, name, identifier, dimensionKeys, dimensionValues);
+	}
+
+	static class PrometheusEndpoint extends NanoHTTPD {
+		static final String MIME_TYPE = "plain/text";
+
+		PrometheusEndpoint(int port) {
+			super(port);
+		}
+
+		@Override
+		public Response serve(IHTTPSession session) {
+			if (session.getUri().equals("/metrics")) {
+				StringWriter writer = new StringWriter();
+				try {
+					TextFormat.write004(writer, CollectorRegistry.defaultRegistry.metricFamilySamples());
+				} catch (IOException e) {
+					return newFixedLengthResponse(Response.Status.INTERNAL_ERROR, MIME_TYPE, "Unable to output metrics");
+				}
+				return newFixedLengthResponse(Response.Status.OK, TextFormat.CONTENT_TYPE_004, writer.toString());
+			} else {
+				return newFixedLengthResponse(Response.Status.NOT_FOUND, MIME_TYPE, "Not found");
+			}
+		}
+	}
+
+	private static class HistogramSummaryProxy extends Collector {
+		private static final List<Double> QUANTILES = Arrays.asList(.5, .75, .95, .98, .99, .999);
+
+		private final Histogram histogram;
+		private final String metricName;
+		private final String metricIdentifier;
+		private final List<String> labelNamesWithQuantile;
+		private final List<String> labelValues;
+
+		HistogramSummaryProxy(final Histogram histogram, final String metricName, final String metricIdentifier, final List<String> labelNames, final List<String> labelValues) {
+			this.histogram = histogram;
+			this.metricName = metricName;
+			this.metricIdentifier = metricIdentifier;
+			this.labelNamesWithQuantile = addToList(labelNames, "quantile");
+			this.labelValues = labelValues;
+		}
+
+		@Override
+		public List<MetricFamilySamples> collect() {
+			// We cannot use SummaryMetricFamily because it is impossible to get a sum of all values (at least for Dropwizard histograms,
+			// whose snapshot's values array only holds a sample of recent values).
+
+			final HistogramStatistics statistics = histogram.getStatistics();
+
+			List<MetricFamilySamples.Sample> samples = new LinkedList<>();
+			samples.add(new MetricFamilySamples.Sample(metricName + "_count",
+				labelNamesWithQuantile.subList(0, labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount()));
+			for (final Double quantile : QUANTILES) {
+				samples.add(new MetricFamilySamples.Sample(metricName, labelNamesWithQuantile,
+					addToList(labelValues, quantile.toString()),
+					statistics.getQuantile(quantile)));
+			}
+			return Collections.singletonList(new MetricFamilySamples(metricName, Type.SUMMARY, metricIdentifier, samples));
+		}
+	}
+
+	private static List<String> addToList(List<String> list, String element) {
+		final List<String> result = new ArrayList<>(list);
+		result.add(element);
+		return result;
+	}
+
+	private static String[] toArray(List<String> labelNames) {
+		return labelNames.toArray(new String[labelNames.size()]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
new file mode 100644
index 0000000..83b7b41
--- /dev/null
+++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.util.TestMeter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.apache.flink.util.TestLogger;
+
+import com.mashape.unirest.http.HttpResponse;
+import com.mashape.unirest.http.Unirest;
+import com.mashape.unirest.http.exceptions.UnirestException;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+
+import static org.apache.flink.metrics.prometheus.PrometheusReporter.ARG_PORT;
+import static org.apache.flink.runtime.metrics.scope.ScopeFormat.SCOPE_SEPARATOR;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for {@link PrometheusReporter}.
+ */
+public class PrometheusReporterTest extends TestLogger {
+	private static final int NON_DEFAULT_PORT = 9429;
+
+	private static final String HOST_NAME = "hostname";
+	private static final String TASK_MANAGER = "tm";
+
+	private static final String HELP_PREFIX = "# HELP ";
+	private static final String TYPE_PREFIX = "# TYPE ";
+	private static final String DIMENSIONS = "host=\"" + HOST_NAME + "\",tm_id=\"" + TASK_MANAGER + "\"";
+	private static final String DEFAULT_LABELS = "{" + DIMENSIONS + ",}";
+	private static final String SCOPE_PREFIX = "flink_taskmanager_";
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	private final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter()));
+	private final MetricReporter reporter = registry.getReporters().get(0);
+
+	@Test
+	public void counterIsReportedAsPrometheusGauge() throws UnirestException {
+		//Prometheus counters may not decrease
+		Counter testCounter = new SimpleCounter();
+		testCounter.inc(7);
+
+		String counterName = "testCounter";
+		String gaugeName = SCOPE_PREFIX + counterName;
+
+		assertThat(addMetricAndPollResponse(testCounter, counterName),
+			equalTo(HELP_PREFIX + gaugeName + " " + getFullMetricName(counterName) + "\n" +
+				TYPE_PREFIX + gaugeName + " gauge" + "\n" +
+				gaugeName + DEFAULT_LABELS + " 7.0" + "\n"));
+	}
+
+	@Test
+	public void gaugeIsReportedAsPrometheusGauge() throws UnirestException {
+		Gauge<Integer> testGauge = new Gauge<Integer>() {
+			@Override
+			public Integer getValue() {
+				return 1;
+			}
+		};
+
+		String gaugeName = "testGauge";
+		String prometheusGaugeName = SCOPE_PREFIX + gaugeName;
+
+		assertThat(addMetricAndPollResponse(testGauge, gaugeName),
+			equalTo(HELP_PREFIX + prometheusGaugeName + " " + getFullMetricName(gaugeName) + "\n" +
+				TYPE_PREFIX + prometheusGaugeName + " gauge" + "\n" +
+				prometheusGaugeName + DEFAULT_LABELS + " 1.0" + "\n"));
+	}
+
+	@Test
+	public void histogramIsReportedAsPrometheusSummary() throws UnirestException {
+		Histogram testHistogram = new TestingHistogram();
+
+		String histogramName = "testHistogram";
+		String summaryName = SCOPE_PREFIX + histogramName;
+
+		String response = addMetricAndPollResponse(testHistogram, histogramName);
+		assertThat(response, containsString(HELP_PREFIX + summaryName + " " + getFullMetricName(histogramName) + "\n" +
+			TYPE_PREFIX + summaryName + " summary" + "\n" +
+			summaryName + "_count" + DEFAULT_LABELS + " 1.0" + "\n"));
+		for (String quantile : Arrays.asList("0.5", "0.75", "0.95", "0.98", "0.99", "0.999")) {
+			assertThat(response, containsString(
+				summaryName + "{" + DIMENSIONS + ",quantile=\"" + quantile + "\",} " + quantile + "\n"));
+		}
+	}
+
+	@Test
+	public void meterRateIsReportedAsPrometheusGauge() throws UnirestException {
+		Meter testMeter = new TestMeter();
+
+		String meterName = "testMeter";
+		String counterName = SCOPE_PREFIX + meterName;
+
+		assertThat(addMetricAndPollResponse(testMeter, meterName),
+			equalTo(HELP_PREFIX + counterName + " " + getFullMetricName(meterName) + "\n" +
+				TYPE_PREFIX + counterName + " gauge" + "\n" +
+				counterName + DEFAULT_LABELS + " 5.0" + "\n"));
+	}
+
+	@Test
+	public void endpointIsUnavailableAfterReporterIsClosed() throws UnirestException {
+		reporter.close();
+		thrown.expect(UnirestException.class);
+		pollMetrics();
+	}
+
+	@Test
+	public void invalidCharactersAreReplacedWithUnderscore() {
+		assertThat(PrometheusReporter.replaceInvalidChars(""), equalTo(""));
+		assertThat(PrometheusReporter.replaceInvalidChars("abc"), equalTo("abc"));
+		assertThat(PrometheusReporter.replaceInvalidChars("abc\""), equalTo("abc_"));
+		assertThat(PrometheusReporter.replaceInvalidChars("\"abc"), equalTo("_abc"));
+		assertThat(PrometheusReporter.replaceInvalidChars("\"abc\""), equalTo("_abc_"));
+		assertThat(PrometheusReporter.replaceInvalidChars("\"a\"b\"c\""), equalTo("_a_b_c_"));
+		assertThat(PrometheusReporter.replaceInvalidChars("\"\"\"\""), equalTo("____"));
+		assertThat(PrometheusReporter.replaceInvalidChars("    "), equalTo("____"));
+		assertThat(PrometheusReporter.replaceInvalidChars("\"ab ;(c)'"), equalTo("_ab___c__"));
+		assertThat(PrometheusReporter.replaceInvalidChars("a b c"), equalTo("a_b_c"));
+		assertThat(PrometheusReporter.replaceInvalidChars("a b c "), equalTo("a_b_c_"));
+		assertThat(PrometheusReporter.replaceInvalidChars("a;b'c*"), equalTo("a_b_c_"));
+		assertThat(PrometheusReporter.replaceInvalidChars("a,=;:?'b,=;:?'c"), equalTo("a___:__b___:__c"));
+	}
+
+	private String addMetricAndPollResponse(Metric metric, String metricName) throws UnirestException {
+		reporter.notifyOfAddedMetric(metric, metricName, new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER)));
+		return pollMetrics().getBody();
+	}
+
+	private static HttpResponse<String> pollMetrics() throws UnirestException {
+		return Unirest.get("http://localhost:" + NON_DEFAULT_PORT + "/metrics").asString();
+	}
+
+	private static String getFullMetricName(String metricName) {
+		return HOST_NAME + SCOPE_SEPARATOR + "taskmanager" + SCOPE_SEPARATOR + TASK_MANAGER + SCOPE_SEPARATOR + metricName;
+	}
+
+	private static Configuration createConfigWithOneReporter() {
+		Configuration cfg = new Configuration();
+		cfg.setString(MetricOptions.REPORTERS_LIST, "test1");
+		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." +
+			ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getName());
+		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ARG_PORT, "" + NON_DEFAULT_PORT);
+		return cfg;
+	}
+
+	@After
+	public void closeReporterAndShutdownRegistry() {
+		reporter.close();
+		registry.shutdown();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-metrics/flink-metrics-prometheus/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-prometheus/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-metrics/flink-metrics-prometheus/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
index 3be21aa..3b655c4 100644
--- a/flink-metrics/pom.xml
+++ b/flink-metrics/pom.xml
@@ -39,6 +39,7 @@ under the License.
 		<module>flink-metrics-ganglia</module>
 		<module>flink-metrics-graphite</module>
 		<module>flink-metrics-jmx</module>
+		<module>flink-metrics-prometheus</module>
 		<module>flink-metrics-statsd</module>
 		<module>flink-metrics-datadog</module>
 	</modules>

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 90de6a1..d73f681 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,6 +108,7 @@ under the License.
 		<curator.version>2.12.0</curator.version>
 		<jackson.version>2.7.4</jackson.version>
 		<metrics.version>3.1.0</metrics.version>
+		<prometheus.version>0.0.21</prometheus.version>
 		<junit.version>4.12</junit.version>
 		<mockito.version>1.10.19</mockito.version>
 		<powermock.version>1.6.5</powermock.version>


[08/16] flink git commit: [FLINK-6638] Allow overriding default for primitive ConfigOption

Posted by ch...@apache.org.
[FLINK-6638] Allow overriding default for primitive ConfigOption

This closes #4016.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60a59d27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60a59d27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60a59d27

Branch: refs/heads/master
Commit: 60a59d27012a0343d1c9bff8b8816284d6ef762a
Parents: f839018
Author: zentol <ch...@apache.org>
Authored: Tue May 30 09:42:31 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:02:08 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/Configuration.java      | 90 ++++++++++++++++++++
 .../configuration/DelegatingConfiguration.java  | 25 ++++++
 2 files changed, 115 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60a59d27/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index ea0c419..a1da5b3 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -219,6 +219,24 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	}
 
 	/**
+	 * Returns the value associated with the given config option as an integer.
+	 * If no value is mapped under any key of the option, it returns the specified
+	 * default instead of the option's default value.
+	 *
+	 * @param configOption The configuration option
+	 * @param overrideDefault The value to return if no value was mapper for any key of the option
+	 * @return the configured value associated with the given config option, or the overrideDefault
+	 */
+	@PublicEvolving
+	public int getInteger(ConfigOption<Integer> configOption, int overrideDefault) {
+		Object o = getRawValueFromOption(configOption);
+		if (o == null) {
+			return overrideDefault;
+		}
+		return convertToInt(o, configOption.defaultValue());
+	}
+
+	/**
 	 * Adds the given key/value pair to the configuration object.
 	 * 
 	 * @param key
@@ -275,6 +293,24 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	}
 
 	/**
+	 * Returns the value associated with the given config option as a long integer.
+	 * If no value is mapped under any key of the option, it returns the specified
+	 * default instead of the option's default value.
+	 *
+	 * @param configOption The configuration option
+	 * @param overrideDefault The value to return if no value was mapper for any key of the option
+	 * @return the configured value associated with the given config option, or the overrideDefault
+	 */
+	@PublicEvolving
+	public long getLong(ConfigOption<Long> configOption, long overrideDefault) {
+		Object o = getRawValueFromOption(configOption);
+		if (o == null) {
+			return overrideDefault;
+		}
+		return convertToLong(o, configOption.defaultValue());
+	}
+
+	/**
 	 * Adds the given key/value pair to the configuration object.
 	 * 
 	 * @param key
@@ -331,6 +367,24 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	}
 
 	/**
+	 * Returns the value associated with the given config option as a boolean.
+	 * If no value is mapped under any key of the option, it returns the specified
+	 * default instead of the option's default value.
+	 *
+	 * @param configOption The configuration option
+	 * @param overrideDefault The value to return if no value was mapper for any key of the option
+	 * @return the configured value associated with the given config option, or the overrideDefault
+	 */
+	@PublicEvolving
+	public boolean getBoolean(ConfigOption<Boolean> configOption, boolean overrideDefault) {
+		Object o = getRawValueFromOption(configOption);
+		if (o == null) {
+			return overrideDefault;
+		}
+		return convertToBoolean(o);
+	}
+
+	/**
 	 * Adds the given key/value pair to the configuration object.
 	 * 
 	 * @param key
@@ -387,6 +441,24 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	}
 
 	/**
+	 * Returns the value associated with the given config option as a float.
+	 * If no value is mapped under any key of the option, it returns the specified
+	 * default instead of the option's default value.
+	 *
+	 * @param configOption The configuration option
+	 * @param overrideDefault The value to return if no value was mapper for any key of the option
+	 * @return the configured value associated with the given config option, or the overrideDefault
+	 */
+	@PublicEvolving
+	public float getFloat(ConfigOption<Float> configOption, float overrideDefault) {
+		Object o = getRawValueFromOption(configOption);
+		if (o == null) {
+			return overrideDefault;
+		}
+		return convertToFloat(o, configOption.defaultValue());
+	}
+
+	/**
 	 * Adds the given key/value pair to the configuration object.
 	 * 
 	 * @param key
@@ -443,6 +515,24 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	}
 
 	/**
+	 * Returns the value associated with the given config option as a {@code double}.
+	 * If no value is mapped under any key of the option, it returns the specified
+	 * default instead of the option's default value.
+	 *
+	 * @param configOption The configuration option
+	 * @param overrideDefault The value to return if no value was mapper for any key of the option
+	 * @return the configured value associated with the given config option, or the overrideDefault
+	 */
+	@PublicEvolving
+	public double getDouble(ConfigOption<Double> configOption, double overrideDefault) {
+		Object o = getRawValueFromOption(configOption);
+		if (o == null) {
+			return overrideDefault;
+		}
+		return convertToDouble(o, configOption.defaultValue());
+	}
+
+	/**
 	 * Adds the given key/value pair to the configuration object.
 	 * 
 	 * @param key

http://git-wip-us.apache.org/repos/asf/flink/blob/60a59d27/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index 29f76b5..8cac66c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -111,6 +111,11 @@ public final class DelegatingConfiguration extends Configuration {
 	public int getInteger(ConfigOption<Integer> configOption) {
 		return  this.backingConfig.getInteger(prefixOption(configOption, prefix));
 	}
+	
+	@Override
+	public int getInteger(ConfigOption<Integer> configOption, int overrideDefault) {
+		return this.backingConfig.getInteger(configOption, overrideDefault);
+	}
 
 	@Override
 	public void setInteger(String key, int value) {
@@ -133,6 +138,11 @@ public final class DelegatingConfiguration extends Configuration {
 	}
 
 	@Override
+	public long getLong(ConfigOption<Long> configOption, long overrideDefault) {
+		return this.backingConfig.getLong(configOption, overrideDefault);
+	}
+
+	@Override
 	public void setLong(String key, long value) {
 		this.backingConfig.setLong(this.prefix + key, value);
 	}
@@ -163,6 +173,11 @@ public final class DelegatingConfiguration extends Configuration {
 	}
 
 	@Override
+	public boolean getBoolean(ConfigOption<Boolean> configOption, boolean overrideDefault) {
+		return this.backingConfig.getBoolean(configOption, overrideDefault);
+	}
+
+	@Override
 	public float getFloat(String key, float defaultValue) {
 		return this.backingConfig.getFloat(this.prefix + key, defaultValue);
 	}
@@ -173,6 +188,11 @@ public final class DelegatingConfiguration extends Configuration {
 	}
 
 	@Override
+	public float getFloat(ConfigOption<Float> configOption, float overrideDefault) {
+		return this.backingConfig.getFloat(configOption, overrideDefault);
+	}
+
+	@Override
 	public void setFloat(String key, float value) {
 		this.backingConfig.setFloat(this.prefix + key, value);
 	}
@@ -193,6 +213,11 @@ public final class DelegatingConfiguration extends Configuration {
 	}
 
 	@Override
+	public double getDouble(ConfigOption<Double> configOption, double overrideDefault) {
+		return this.backingConfig.getDouble(configOption, overrideDefault);
+	}
+
+	@Override
 	public void setDouble(String key, double value) {
 		this.backingConfig.setDouble(this.prefix + key, value);
 	}


[09/16] flink git commit: [FLINK-6987] Fix TextInputFormatTest for paths with spaces

Posted by ch...@apache.org.
[FLINK-6987] Fix TextInputFormatTest for paths with spaces

This closes #4168.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c84a8285
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c84a8285
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c84a8285

Branch: refs/heads/master
Commit: c84a82854e715ebdf714b4f4ea09c73645e6cd5d
Parents: bb9505d
Author: zhangminglei <zm...@163.com>
Authored: Tue Jun 27 11:34:37 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:02:08 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/io/TextInputFormatTest.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c84a8285/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
index 4a52eea..6bff9db 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
@@ -108,7 +108,7 @@ public class TextInputFormatTest {
 			}
 			File parentDir = new File("tmp");
 
-			TextInputFormat inputFormat = new TextInputFormat(new Path(parentDir.toURI().toString()));
+			TextInputFormat inputFormat = new TextInputFormat(new Path(parentDir.toURI()));
 			inputFormat.setNestedFileEnumeration(true);
 			inputFormat.setNumLineSamples(10);
 


[03/16] flink git commit: [FLINK-6461] Replace usages of deprecated web port key

Posted by ch...@apache.org.
[FLINK-6461] Replace usages of deprecated web port key

This closes #3951.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80b28056
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80b28056
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80b28056

Branch: refs/heads/master
Commit: 80b28056d1c22461d3cf2132188c3cd85f59b80d
Parents: edb79b0
Author: zentol <ch...@apache.org>
Authored: Fri May 19 15:35:26 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:02:07 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/webmonitor/WebRuntimeMonitorITCase.java    | 3 +--
 .../flink/runtime/clusterframework/BootstrapTools.java       | 3 ++-
 .../org/apache/flink/runtime/jobmanager/JobManager.scala     | 8 ++++----
 .../apache/flink/runtime/minicluster/FlinkMiniCluster.scala  | 4 ++--
 .../runtime/jobmanager/JobManagerProcessReapingTest.java     | 4 ++--
 .../apache/flink/runtime/testutils/ZooKeeperTestUtils.java   | 4 ++--
 .../flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java | 5 +++--
 .../org/apache/flink/yarn/YarnApplicationMasterRunner.java   | 4 ++--
 8 files changed, 18 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/80b28056/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 9ee215a..6da11d1 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -168,7 +167,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			String[] jobManagerAddress = new String[2];
 			for (int i = 0; i < jobManager.length; i++) {
 				Configuration jmConfig = config.clone();
-				jmConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
+				jmConfig.setInteger(JobManagerOptions.WEB_PORT,
 						webMonitor[i].getServerPort());
 
 				jobManager[i] = JobManager.startJobManagerActors(

http://git-wip-us.apache.org/repos/asf/flink/blob/80b28056/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 5bdfe1a..b8e5351 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
@@ -187,7 +188,7 @@ public class BootstrapTools {
 		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host().get());
 		config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port().get().toString());
 
-		if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+		if (config.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) {
 			logger.info("Starting JobManager Web Frontend");
 
 			// start the web frontend. we need to load this dynamically

http://git-wip-us.apache.org/repos/asf/flink/blob/80b28056/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index f2934b5..071dd02 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -173,7 +173,7 @@ class JobManager(
    * to run in the actor system of the associated job manager.
    */
   val webMonitorPort : Int = flinkConfiguration.getInteger(
-    ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
+    JobManagerOptions.WEB_PORT.key(), -1)
 
   /** The default directory for savepoints. */
   val defaultSavepointDir: String = ConfigurationUtil.getStringWithDeprecatedKeys(
@@ -2272,7 +2272,7 @@ object JobManager {
     : (ActorRef, ActorRef, Option[WebMonitor], Option[ActorRef]) = {
 
     val webMonitor: Option[WebMonitor] =
-      if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+      if (configuration.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) {
         LOG.info("Starting JobManager web frontend")
 
         // start the web frontend. we need to load this dynamically
@@ -2290,7 +2290,7 @@ object JobManager {
 
     // Reset the port (necessary in case of automatic port selection)
     webMonitor.foreach{ monitor => configuration.setInteger(
-      ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, monitor.getServerPort) }
+      JobManagerOptions.WEB_PORT, monitor.getServerPort) }
 
     try {
       // bring up the job manager actor
@@ -2451,7 +2451,7 @@ object JobManager {
     }
 
     if (cliOptions.getWebUIPort() >= 0) {
-      configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, cliOptions.getWebUIPort())
+      configuration.setInteger(JobManagerOptions.WEB_PORT, cliOptions.getWebUIPort())
     }
 
     if (cliOptions.getHost() != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/80b28056/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index abc8946..d66d106 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -27,7 +27,7 @@ import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
 import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
@@ -385,7 +385,7 @@ abstract class FlinkMiniCluster(
     : Option[WebMonitor] = {
     if(
       config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) &&
-        config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+        config.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) {
 
       LOG.info("Starting JobManger web frontend")
       // start the new web frontend. we need to load this dynamically

http://git-wip-us.apache.org/repos/asf/flink/blob/80b28056/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index 35ac1e5..df53d59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -26,7 +26,7 @@ import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmaster.JobMaster;
@@ -203,7 +203,7 @@ public class JobManagerProcessReapingTest extends TestLogger {
 		public static void main(String[] args) {
 			try {
 				Configuration config = new Configuration();
-				config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
+				config.setInteger(JobManagerOptions.WEB_PORT, -1);
 
 				JobManager.runJobManager(config, JobManagerMode.CLUSTER, "localhost", 0);
 				System.exit(0);

http://git-wip-us.apache.org/repos/asf/flink/blob/80b28056/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 48eb392..9435ebf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 
@@ -66,7 +66,7 @@ public class ZooKeeperTestUtils {
 		checkNotNull(fsStateHandlePath, "File state handle backend path");
 
 		// Web frontend, you have been dismissed. Sorry.
-		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
+		config.setInteger(JobManagerOptions.WEB_PORT, -1);
 
 		// ZooKeeper recovery mode
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");

http://git-wip-us.apache.org/repos/asf/flink/blob/80b28056/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
index 8bf6a2e..85b51a3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -189,8 +190,8 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
 		}
 
 		// if a web monitor shall be started, set the port to random binding
-		if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
-			configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+		if (configuration.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) {
+			configuration.setInteger(JobManagerOptions.WEB_PORT, 0);
 		}
 
 		// if the user has set the deprecated YARN-specific config keys, we add the

http://git-wip-us.apache.org/repos/asf/flink/blob/80b28056/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index a424740..dccbb71 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -510,8 +510,8 @@ public class YarnApplicationMasterRunner {
 		}
 
 		// if a web monitor shall be started, set the port to random binding
-		if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
-			configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+		if (configuration.getInteger(JobManagerOptions.WEB_PORT.key(), 0) >= 0) {
+			configuration.setInteger(JobManagerOptions.WEB_PORT, 0);
 		}
 
 		// if the user has set the deprecated YARN-specific config keys, we add the


[10/16] flink git commit: [FLINK-6742] Add eager checks for parallelism/chain-length change

Posted by ch...@apache.org.
[FLINK-6742] Add eager checks for parallelism/chain-length change

This closes #4185.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd458259
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd458259
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd458259

Branch: refs/heads/master
Commit: cd45825958297a0da157f850c3c63e93dbadfb7a
Parents: 60a59d2
Author: zentol <ch...@apache.org>
Authored: Mon Jun 26 13:38:54 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:02:08 2017 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/savepoint/SavepointV2.java      | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cd458259/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
index 5e46f93..bd364a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -178,6 +179,18 @@ public class SavepointV2 implements Savepoint {
 
 			List<OperatorID> operatorIDs = jobVertex.getOperatorIDs();
 
+			Preconditions.checkArgument(
+				jobVertex.getParallelism() == taskState.getParallelism(),
+				"Detected change in parallelism during migration for task " + jobVertex.getJobVertexId() +"." +
+					"When migrating a savepoint from a version < 1.3 please make sure that no changes were made " +
+					"to the parallelism of stateful operators.");
+
+			Preconditions.checkArgument(
+				operatorIDs.size() == taskState.getChainLength(),
+				"Detected change in chain length during migration for task " + jobVertex.getJobVertexId() +". " +
+					"When migrating a savepoint from a version < 1.3 please make sure that the topology was not " +
+					"changed by modification of a chain containing a stateful operator.");
+
 			for (int subtaskIndex = 0; subtaskIndex < jobVertex.getParallelism(); subtaskIndex++) {
 				SubtaskState subtaskState;
 				try {


[05/16] flink git commit: [FLINK-7000] Add custom configuration local environment in Scala StreamExecutionEnvironment

Posted by ch...@apache.org.
[FLINK-7000] Add custom configuration local environment in Scala StreamExecutionEnvironment

This closes #4178.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acc2e34f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acc2e34f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acc2e34f

Branch: refs/heads/master
Commit: acc2e34f5045a0e239eb4e3b8affdfa69530c0a9
Parents: 80b2805
Author: Lim Chee Hau <ch...@gmail.com>
Authored: Sat Jun 24 16:26:15 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:02:07 2017 +0200

----------------------------------------------------------------------
 .../api/scala/StreamExecutionEnvironment.scala          | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/acc2e34f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 742baf9..9fd03c3 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -768,6 +768,18 @@ object StreamExecutionEnvironment {
   }
 
   /**
+   * Creates a local execution environment. The local execution environment will run the
+   * program in a multi-threaded fashion in the same JVM as the environment was created in.
+   *
+   * @param parallelism   The parallelism for the local environment.
+   * @param configuration Pass a custom configuration into the cluster.
+   */
+  def createLocalEnvironment(parallelism: Int, configuration: Configuration):
+  StreamExecutionEnvironment = {
+    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism, configuration))
+  }
+
+  /**
    * Creates a [[StreamExecutionEnvironment]] for local program execution that also starts the
    * web monitoring UI.
    *


[11/16] flink git commit: [FLINK-5918] [runtime] port range support for taskmanager.rpc.port

Posted by ch...@apache.org.
[FLINK-5918] [runtime] port range support for taskmanager.rpc.port

This closes #3416.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f0ac26e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f0ac26e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f0ac26e

Branch: refs/heads/master
Commit: 3f0ac26e9f502f9e032af0375d52c5e4af2126f3
Parents: c84a828
Author: fengyelei <fe...@huawei.com>
Authored: Mon Feb 27 09:51:22 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:04:05 2017 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            |  2 +-
 .../flink/configuration/ConfigConstants.java    |  7 +-
 .../flink/configuration/TaskManagerOptions.java |  8 ++
 .../java/org/apache/flink/util/NetUtils.java    | 13 +++
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 51 +++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   | 54 +-----------
 .../runtime/minicluster/FlinkMiniCluster.scala  | 13 ++-
 .../minicluster/LocalFlinkMiniCluster.scala     | 14 +--
 .../flink/runtime/taskmanager/TaskManager.scala | 89 +++++++++++++++++---
 .../TaskManagerConfigurationTest.java           | 33 +++++---
 10 files changed, 196 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 8a6f67d..1511f34 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -247,7 +247,7 @@ The following parameters configure Flink's JobManager and TaskManagers.
 
 - `taskmanager.hostname`: The hostname of the network interface that the TaskManager binds to. By default, the TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers. This option can be used to define a hostname if that strategy fails for some reason. Because different TaskManagers need different values for this option, it usually is specified in an additional non-shared TaskManager-specific config file.
 
-- `taskmanager.rpc.port`: The task manager's IPC port (DEFAULT: **0**, which lets the OS choose a free port).
+- `taskmanager.rpc.port`: The task manager's IPC port (DEFAULT: **0**, which lets the OS choose a free port). Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.
 
 - `taskmanager.data.port`: The task manager's port used for data exchange operations (DEFAULT: **0**, which lets the OS choose a free port).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d467dfa..a7a883f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -185,8 +185,9 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_HOSTNAME_KEY = "taskmanager.hostname";
 
 	/**
-	 * The config parameter defining the task manager's IPC port from the configuration.
+	 * @deprecated use {@link TaskManagerOptions#RPC_PORT} instead
 	 */
+	@Deprecated
 	public static final String TASK_MANAGER_IPC_PORT_KEY = "taskmanager.rpc.port";
 
 	/**
@@ -1243,9 +1244,9 @@ public final class ConfigConstants {
 	public static final String DEFAULT_BLOB_SERVER_PORT = "0";
 
 	/**
-	 * The default network port the task manager expects incoming IPC connections. The {@code 0} means that
-	 * the TaskManager searches for a free port.
+	 * @deprecated use {@link TaskManagerOptions#RPC_PORT} instead
 	 */
+	@Deprecated
 	public static final int DEFAULT_TASK_MANAGER_IPC_PORT = 0;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index bde564a..fef0975 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -57,6 +57,14 @@ public class TaskManagerOptions {
 			key("taskmanager.exit-on-fatal-akka-error")
 			.defaultValue(false);
 
+	/**
+	 * The default network port range the task manager expects incoming IPC connections. The {@code "0"} means that
+	 * the TaskManager searches for a free port.
+	 */
+	public static final ConfigOption<String> RPC_PORT = 
+		key("taskmanager.rpc.port")
+			.defaultValue("0");
+
 	// ------------------------------------------------------------------------
 	//  Managed Memory Options
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index d4437e4..f56b452 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -319,11 +319,24 @@ public class NetUtils {
 			int dashIdx = range.indexOf('-');
 			if (dashIdx == -1) {
 				// only one port in range:
+				final int port = Integer.valueOf(range);
+				if (port < 0 || port > 65535) {
+					throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +
+						"and 65535, but was " + port + ".");
+				}
 				rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
 			} else {
 				// evaluate range
 				final int start = Integer.valueOf(range.substring(0, dashIdx));
+				if (start < 0 || start > 65535) {
+					throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +
+						"and 65535, but was " + start + ".");
+				}
 				final int end = Integer.valueOf(range.substring(dashIdx+1, range.length()));
+				if (end < 0 || end > 65535) {
+					throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +
+						"and 65535, but was " + end + ".");
+				}
 				rangeIterator = new Iterator<Integer>() {
 					int i = start;
 					@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 2f8445a..b74a9a3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -29,9 +29,11 @@ import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration.{AkkaOptions, Configuration, SecurityOptions}
 import org.apache.flink.runtime.net.SSLUtils
 import org.apache.flink.util.NetUtils
+import org.jboss.netty.channel.ChannelException
 import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
 import org.slf4j.LoggerFactory
 
+import scala.annotation.tailrec
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.postfixOps
@@ -682,5 +684,54 @@ object AkkaUtils {
   def getLocalAkkaURL(actorName: String): String = {
     "akka://flink/user/" + actorName
   }
+
+  /**
+    * Retries a function if it fails because of a [[java.net.BindException]].
+    *
+    * @param fn The function to retry
+    * @param stopCond Flag to signal termination
+    * @param maxSleepBetweenRetries Max random sleep time between retries
+    * @tparam T Return type of the the function to retry
+    * @return Return value of the the function to retry
+    */
+  @tailrec
+  def retryOnBindException[T](
+      fn: => T,
+      stopCond: => Boolean,
+      maxSleepBetweenRetries : Long = 0 )
+    : scala.util.Try[T] = {
+
+    def sleepBeforeRetry() : Unit = {
+      if (maxSleepBetweenRetries > 0) {
+        val sleepTime = (Math.random() * maxSleepBetweenRetries).asInstanceOf[Long]
+        LOG.info(s"Retrying after bind exception. Sleeping for $sleepTime ms.")
+        Thread.sleep(sleepTime)
+      }
+    }
+
+    scala.util.Try {
+      fn
+    } match {
+      case scala.util.Failure(x: BindException) =>
+        if (stopCond) {
+          scala.util.Failure(x)
+        } else {
+          sleepBeforeRetry()
+          retryOnBindException(fn, stopCond)
+        }
+      case scala.util.Failure(x: Exception) => x.getCause match {
+        case c: ChannelException =>
+          if (stopCond) {
+            scala.util.Failure(new RuntimeException(
+              "Unable to do further retries starting the actor system"))
+          } else {
+            sleepBeforeRetry()
+            retryOnBindException(fn, stopCond)
+          }
+        case _ => scala.util.Failure(x)
+      }
+      case f => f
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 071dd02..100199b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -86,9 +86,7 @@ import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
-import org.jboss.netty.channel.ChannelException
 
-import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.concurrent._
@@ -2114,7 +2112,7 @@ object JobManager {
       listeningPortRange: java.util.Iterator[Integer])
     : Unit = {
 
-    val result = retryOnBindException({
+    val result = AkkaUtils.retryOnBindException({
       // Try all ports in the range until successful
       val socket = NetUtils.createSocketFromPorts(
         listeningPortRange,
@@ -2146,56 +2144,6 @@ object JobManager {
   }
 
   /**
-    * Retries a function if it fails because of a [[java.net.BindException]].
-    *
-    * @param fn The function to retry
-    * @param stopCond Flag to signal termination
-    * @param maxSleepBetweenRetries Max random sleep time between retries
-    * @tparam T Return type of the the function to retry
-    * @return Return value of the the function to retry
-    */
-  @tailrec
-  def retryOnBindException[T](
-      fn: => T,
-      stopCond: => Boolean,
-      maxSleepBetweenRetries : Long = 0 )
-    : scala.util.Try[T] = {
-
-    def sleepBeforeRetry() : Unit = {
-      if (maxSleepBetweenRetries > 0) {
-        val sleepTime = (Math.random() * maxSleepBetweenRetries).asInstanceOf[Long]
-        LOG.info(s"Retrying after bind exception. Sleeping for $sleepTime ms.")
-        Thread.sleep(sleepTime)
-      }
-    }
-
-    scala.util.Try {
-      fn
-    } match {
-      case scala.util.Failure(x: BindException) =>
-        if (stopCond) {
-          scala.util.Failure(new RuntimeException(
-            "Unable to do further retries starting the actor system"))
-        } else {
-          sleepBeforeRetry()
-          retryOnBindException(fn, stopCond)
-        }
-      case scala.util.Failure(x: Exception) => x.getCause match {
-        case c: ChannelException =>
-          if (stopCond) {
-            scala.util.Failure(new RuntimeException(
-              "Unable to do further retries starting the actor system"))
-          } else {
-            sleepBeforeRetry()
-            retryOnBindException(fn, stopCond)
-          }
-        case _ => scala.util.Failure(x)
-      }
-      case f => f
-    }
-  }
-
-  /**
     * Starts the JobManager actor system.
     *
     * @param configuration Configuration to use for the job manager actor system

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index d66d106..a829059 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -27,7 +27,7 @@ import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
 import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
-import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions}
+import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, Configuration, JobManagerOptions, TaskManagerOptions}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, Leader
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
+import org.apache.flink.util.NetUtils
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -250,10 +251,14 @@ abstract class FlinkMiniCluster(
   }
 
   def getTaskManagerAkkaConfig(index: Int): Config = {
-    val port = originalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
-                                                ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
+    val portRange = originalConfiguration.getString(TaskManagerOptions.RPC_PORT)
 
-    val resolvedPort = if(port != 0) port + index else port
+    val portRangeIterator = NetUtils.getPortRangeFromString(portRange)
+
+    val resolvedPort = if (portRangeIterator.hasNext) {
+      val port = portRangeIterator.next()
+      if (port > 0) port + index else 0
+    } else 0
 
     AkkaUtils.getAkkaConfig(originalConfiguration, Some((hostname, resolvedPort)))
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index a535388..a3e1c78 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.util.NetUtils
 
 import scala.concurrent.{Await, ExecutionContext}
 import scala.concurrent.duration.FiniteDuration
@@ -203,16 +204,19 @@ class LocalFlinkMiniCluster(
   override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
     val config = originalConfiguration.clone()
 
-    val rpcPort = config.getInteger(
-      ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
+    val rpcPortRange = config.getString(TaskManagerOptions.RPC_PORT)
+
+    val rpcPortIterator = NetUtils.getPortRangeFromString(rpcPortRange)
 
     val dataPort = config.getInteger(
       ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
       ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
 
-    if (rpcPort > 0) {
-      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
+    if (rpcPortIterator.hasNext) {
+      val rpcPort = rpcPortIterator.next()
+      if (rpcPort > 0) {
+        config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
+      }
     }
     if (dataPort > 0) {
       config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7684a6b..0c419eb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.taskmanager
 
 import java.io.{File, FileInputStream, IOException}
 import java.lang.management.ManagementFactory
-import java.net.{InetAddress, InetSocketAddress}
+import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket}
 import java.util
 import java.util.concurrent.{Callable, TimeUnit}
 import java.util.{Collections, UUID}
@@ -70,6 +70,7 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
+import org.apache.flink.util.NetUtils
 
 import scala.collection.JavaConverters._
 import scala.concurrent._
@@ -1665,7 +1666,7 @@ object TaskManager {
       Executors.directExecutor(),
       AddressResolution.TRY_ADDRESS_RESOLUTION)
 
-    val (taskManagerHostname, actorSystemPort) = selectNetworkInterfaceAndPort(
+    val (taskManagerHostname, actorSystemPortRange) = selectNetworkInterfaceAndPortRange(
       configuration,
       highAvailabilityServices)
 
@@ -1673,7 +1674,7 @@ object TaskManager {
       runTaskManager(
         taskManagerHostname,
         resourceID,
-        actorSystemPort,
+        actorSystemPortRange,
         configuration,
         highAvailabilityServices,
         taskManagerClass)
@@ -1688,10 +1689,10 @@ object TaskManager {
 
   @throws(classOf[IOException])
   @throws(classOf[IllegalConfigurationException])
-  def selectNetworkInterfaceAndPort(
+  def selectNetworkInterfaceAndPortRange(
       configuration: Configuration,
       highAvailabilityServices: HighAvailabilityServices)
-    : (String, Int) = {
+    : (String, java.util.Iterator[Integer]) = {
 
     var taskManagerHostname = configuration.getString(
       ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null)
@@ -1713,15 +1714,20 @@ object TaskManager {
     }
 
     // if no task manager port has been configured, use 0 (system will pick any free port)
-    val actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0)
-    if (actorSystemPort < 0 || actorSystemPort > 65535) {
-      throw new IllegalConfigurationException("Invalid value for '" +
-        ConfigConstants.TASK_MANAGER_IPC_PORT_KEY +
-        "' (port for the TaskManager actor system) : " + actorSystemPort +
-        " - Leave config parameter empty or use 0 to let the system choose a port automatically.")
+    val portRange = configuration.getString(TaskManagerOptions.RPC_PORT)
+
+    val portRangeIterator = try {
+      NetUtils.getPortRangeFromString(portRange)
+    } catch {
+      case _: NumberFormatException =>
+        throw new IllegalConfigurationException("Invalid value for '" +
+          TaskManagerOptions.RPC_PORT.key() +
+          "' (port for the TaskManager actor system) : " + portRange +
+          " - Leave config parameter empty or use 0 to let the system choose a port automatically.")
     }
 
-    (taskManagerHostname, actorSystemPort)
+    (taskManagerHostname, portRangeIterator)
+
   }
 
   /**
@@ -1876,6 +1882,65 @@ object TaskManager {
   }
 
   /**
+    * Starts and runs the TaskManager. with all its components trying to bind to
+    * a port in the specified range.
+    *
+    * @param taskManagerHostname The hostname/address of the interface where the actor system
+    *                         will communicate.
+    * @param resourceID The id of the resource which the task manager will run on.
+    * @param actorSystemPortRange The port at which the actor system will communicate.
+    * @param configuration The configuration for the TaskManager.
+    * @param taskManagerClass The actor class to instantiate. Allows the use of TaskManager
+    *                         subclasses for example for YARN.
+    */
+  @throws(classOf[Exception])
+  def runTaskManager(
+    taskManagerHostname: String,
+    resourceID: ResourceID,
+    actorSystemPortRange: java.util.Iterator[Integer],
+    configuration: Configuration,
+    highAvailabilityServices: HighAvailabilityServices,
+    taskManagerClass: Class[_ <: TaskManager])
+    : Unit = {
+
+    val result = AkkaUtils.retryOnBindException({
+      // Try all ports in the range until successful
+      val socket = NetUtils.createSocketFromPorts(
+        actorSystemPortRange,
+        new NetUtils.SocketFactory {
+          override def createSocket(port: Int): ServerSocket = new ServerSocket(
+            // Use the correct listening address, bound ports will only be
+            // detected later by Akka.
+            port, 0, InetAddress.getByName(NetUtils.getWildcardIPAddress))
+        })
+
+      val port =
+        if (socket == null) {
+          throw new BindException(s"Unable to allocate port for TaskManager.")
+        } else {
+          try {
+            socket.getLocalPort()
+          } finally {
+            socket.close()
+          }
+        }
+
+      runTaskManager(
+        taskManagerHostname,
+        resourceID,
+        port,
+        configuration,
+        highAvailabilityServices,
+        taskManagerClass)
+    }, { !actorSystemPortRange.hasNext }, 5000)
+
+    result match {
+      case scala.util.Failure(f) => throw f
+      case _ =>
+    }
+  }
+
+  /**
    * Starts the task manager actor.
    *
    * @param configuration The configuration for the TaskManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/3f0ac26e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
index a760760..69cadfb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -36,6 +37,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.lang.reflect.Field;
 import java.net.*;
+import java.util.Iterator;
 import java.util.UUID;
 
 import static org.junit.Assert.*;
@@ -62,8 +64,7 @@ public class TaskManagerConfigurationTest {
 
 		try {
 
-
-			Tuple2<String, Object> address = TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices);
+			Tuple2<String, Iterator<Integer>> address = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices);
 
 			// validate the configured test host name
 			assertEquals(TEST_HOST_NAME, address._1());
@@ -91,17 +92,29 @@ public class TaskManagerConfigurationTest {
 
 		try {
 			// auto port
-			assertEquals(0, TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices)._2());
+			Iterator<Integer> portsIter = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices)._2();
+			assertTrue(portsIter.hasNext());
+			assertEquals(0, (int) portsIter.next());
 
 			// pre-defined port
 			final int testPort = 22551;
-			config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, testPort);
-			assertEquals(testPort, TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices)._2());
+			config.setString(TaskManagerOptions.RPC_PORT, String.valueOf(testPort));
+
+			portsIter = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices)._2();
+			assertTrue(portsIter.hasNext());
+			assertEquals(testPort, (int) portsIter.next());
+
+			// port range
+			config.setString(TaskManagerOptions.RPC_PORT, "8000-8001");
+			portsIter = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices)._2();
+			assertTrue(portsIter.hasNext());
+			assertEquals(8000, (int) portsIter.next());
+			assertEquals(8001, (int) portsIter.next());
 
 			// invalid port
 			try {
-				config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, -1);
-				TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices);
+				config.setString(TaskManagerOptions.RPC_PORT, "-1");
+				TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices);
 				fail("should fail with an exception");
 			}
 			catch (IllegalConfigurationException e) {
@@ -110,8 +123,8 @@ public class TaskManagerConfigurationTest {
 
 			// invalid port
 			try {
-				config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 100000);
-				TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices);
+				config.setString(TaskManagerOptions.RPC_PORT, "100000");
+				TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices);
 				fail("should fail with an exception");
 			}
 			catch (IllegalConfigurationException e) {
@@ -180,7 +193,7 @@ public class TaskManagerConfigurationTest {
 			HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
 		try {
-			assertNotNull(TaskManager.selectNetworkInterfaceAndPort(config, highAvailabilityServices)._1());
+			assertNotNull(TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices)._1());
 		}
 		catch (Exception e) {
 			e.printStackTrace();


[12/16] flink git commit: [FLINK-6655] Add validateAndNormalizeUri method to MemoryArchivist

Posted by ch...@apache.org.
[FLINK-6655] Add validateAndNormalizeUri method to MemoryArchivist

This closes #4156.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39562691
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39562691
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39562691

Branch: refs/heads/master
Commit: 39562691b160e3061794ac6605b5a9c1f031b548
Parents: 3f0ac26
Author: zhangminglei <zm...@163.com>
Authored: Tue Jun 27 21:26:18 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:04:05 2017 +0200

----------------------------------------------------------------------
 .../runtime/jobmanager/MemoryArchivist.scala    | 66 ++++++++++++++++----
 1 file changed, 55 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/39562691/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index d83f2cd..327e2a3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.jobmanager
 
+import java.io.IOException
+import java.net.URI
 import java.util
 
 import akka.actor.ActorRef
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.fs.Path
+import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils
@@ -34,7 +35,6 @@ import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, Executio
 import org.apache.flink.runtime.history.FsJobArchivist
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.state.filesystem.FsStateBackend
 
 import scala.collection.mutable
 import scala.concurrent.future
@@ -86,7 +86,7 @@ class MemoryArchivist(
   }
 
   override def handleMessage: Receive = {
-    
+
     /* Receive Execution Graph to archive */
     case ArchiveExecutionGraph(jobID, graph) =>
       // Keep lru order in case we override a graph (from multiple job submission in one session).
@@ -109,7 +109,7 @@ class MemoryArchivist(
       trimHistory()
 
     case msg : InfoMessage => handleWebServerInfoMessage(msg, sender())
-      
+
     case RequestArchivedJob(jobID: JobID) =>
       val graph = graphs.get(jobID)
       sender ! decorateMessage(ArchivedJob(graph))
@@ -165,7 +165,7 @@ class MemoryArchivist(
     throw new RuntimeException("Received unknown message " + message)
   }
 
-  
+
   private def handleWebServerInfoMessage(message: InfoMessage, theSender: ActorRef): Unit = {
     message match {
       case _ : RequestJobsOverview =>
@@ -175,7 +175,7 @@ class MemoryArchivist(
         catch {
           case t: Throwable => log.error("Exception while creating the jobs overview", t)
         }
-  
+
       case _ : RequestJobsWithIDsOverview =>
         try {
           sender ! decorateMessage(createJobsWithIDsOverview())
@@ -188,7 +188,7 @@ class MemoryArchivist(
         val details = graphs.values.map {
           v => WebMonitorUtils.createDetailsForJob(v)
         }.toArray[JobDetails]
-        
+
         theSender ! decorateMessage(new MultipleJobsDetails(null, details))
     }
   }
@@ -198,7 +198,7 @@ class MemoryArchivist(
     // so we aren't archiving it yet.
     if (archivePath.isDefined && graph.getState.isGloballyTerminalState) {
       try {
-        val p = FsStateBackend.validateAndNormalizeUri(archivePath.get.toUri)
+        val p = validateAndNormalizeUri(archivePath.get.toUri)
         future {
           try {
             FsJobArchivist.archiveJob(p, graph)
@@ -217,7 +217,7 @@ class MemoryArchivist(
   // --------------------------------------------------------------------------
   //  Request Responses
   // --------------------------------------------------------------------------
-  
+
   private def createJobsOverview() : JobsOverview = {
     new JobsOverview(0, finishedCnt, canceledCnt, failedCnt)
   }
@@ -239,7 +239,7 @@ class MemoryArchivist(
 
     new JobsWithIDsOverview(runningOrPending, finished, canceled, failed)
   }
-  
+
   // --------------------------------------------------------------------------
   //  Utilities
   // --------------------------------------------------------------------------
@@ -255,4 +255,48 @@ class MemoryArchivist(
       graphs.remove(jobID)
     }
   }
+
+  /**
+    * Checks and normalizes the archive path URI. This method first checks the validity of the
+    * URI (scheme, path, availability of a matching file system) and then normalizes the URL
+    * to a path.
+    *
+    * If the URI does not include an authority, but the file system configured for the URI has an
+    * authority, then the normalized path will include this authority.
+    *
+    * @param archivePathUri The URI to check and normalize.
+    * @return a normalized URI as a Path.
+    *
+    * @throws IllegalArgumentException Thrown, if the URI misses schema or path.
+    * @throws IOException Thrown, if no file system can be found for the URI's scheme.
+    */
+  @throws[IOException]
+  private def validateAndNormalizeUri(archivePathUri: URI): Path = {
+    val scheme = archivePathUri.getScheme
+    val path = archivePathUri.getPath
+
+    // some validity checks
+    if (scheme == null) {
+      throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +
+        "Please specify the file system scheme explicitly in the URI: " + archivePathUri)
+    }
+
+    if (path == null) {
+      throw new IllegalArgumentException("The path to store the job archives is null. " +
+        "Please specify a directory path for storing job archives. and the URI is: " +
+        archivePathUri)
+    }
+
+    if (path.length == 0 || path == "/") {
+      throw new IllegalArgumentException("Cannot use the root directory for storing job archives.")
+    }
+
+    if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) {
+      // skip verification checks for non-flink supported filesystem
+      // this is because the required filesystem classes may not be available to the flink client
+      throw new IllegalArgumentException("No file system found with scheme " + scheme
+        + ", referenced in file URI '" + archivePathUri.toString + "'.")
+    }
+    new Path(archivePathUri)
+  }
 }


[15/16] flink git commit: [FLINK-7048] [travis] Define javadoc skipping in travis watchdog script

Posted by ch...@apache.org.
[FLINK-7048] [travis] Define javadoc skipping in travis watchdog script

This closes #4227.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95d70db3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95d70db3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95d70db3

Branch: refs/heads/master
Commit: 95d70db36ca05b63e7bd6563ff534038b765c4ed
Parents: caf149e
Author: zentol <ch...@apache.org>
Authored: Thu Jun 29 23:17:01 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:40:46 2017 +0200

----------------------------------------------------------------------
 .travis.yml                  | 24 ++++++++++++------------
 tools/travis_mvn_watchdog.sh |  2 +-
 2 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95d70db3/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index d30e172..5bfb549 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -17,32 +17,32 @@ matrix:
   include:
   # Always run test groups A and B together
     - jdk: "oraclejdk8"
-      env: PROFILE="-Dhadoop.version=2.8.0 -Pflink-fast-tests-a,include-kinesis,jdk8 -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.8.0 -Pflink-fast-tests-a,include-kinesis,jdk8"
     - jdk: "oraclejdk8"
-      env: PROFILE="-Dhadoop.version=2.8.0 -Pflink-fast-tests-b,include-kinesis,jdk8 -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.8.0 -Pflink-fast-tests-b,include-kinesis,jdk8"
     - jdk: "oraclejdk8"
-      env: PROFILE="-Dhadoop.version=2.8.0 -Pflink-fast-tests-c,include-kinesis,jdk8 -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.8.0 -Pflink-fast-tests-c,include-kinesis,jdk8"
 
     - jdk: "oraclejdk8"
-      env: PROFILE="-Dhadoop.version=2.7.3 -Dscala-2.10 -Pflink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.7.3 -Dscala-2.10 -Pflink-fast-tests-a,include-kinesis"
     - jdk: "oraclejdk8"
-      env: PROFILE="-Dhadoop.version=2.7.3 -Dscala-2.10 -Pflink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.7.3 -Dscala-2.10 -Pflink-fast-tests-b,include-kinesis"
     - jdk: "oraclejdk8"
-      env: PROFILE="-Dhadoop.version=2.7.3 -Dscala-2.10 -Pflink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.7.3 -Dscala-2.10 -Pflink-fast-tests-c,include-kinesis"
 
     - jdk: "openjdk7"
-      env: PROFILE="-Dhadoop.version=2.6.5 -Pflink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.6.5 -Pflink-fast-tests-a,include-kinesis"
     - jdk: "openjdk7"
-      env: PROFILE="-Dhadoop.version=2.6.5 -Pflink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.6.5 -Pflink-fast-tests-b,include-kinesis"
     - jdk: "openjdk7"
-      env: PROFILE="-Dhadoop.version=2.6.5 -Pflink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.6.5 -Pflink-fast-tests-c,include-kinesis"
 
     - jdk: "openjdk7"
-      env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.10 -Pflink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.10 -Pflink-fast-tests-a,include-kinesis"
     - jdk: "openjdk7"
-      env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.10 -Pflink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.10 -Pflink-fast-tests-b,include-kinesis"
     - jdk: "openjdk7"
-      env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.10 -Pflink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true"
+      env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.10 -Pflink-fast-tests-c,include-kinesis"
 
 git:
   depth: 100

http://git-wip-us.apache.org/repos/asf/flink/blob/95d70db3/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index 3f59cdf..836bfe1 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -45,7 +45,7 @@ LOG4J_PROPERTIES=${HERE}/log4j-travis.properties
 
 # Maven command to run. We set the forkCount manually, because otherwise Maven sees too many cores
 # on the Travis VMs.
-MVN="mvn -Dflink.forkCount=2 -Dflink.forkCountTestPackage=1 -B $PROFILE -Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES clean install"
+MVN="mvn -Dflink.forkCount=2 -Dflink.forkCountTestPackage=1 -B -Dmaven.javadoc.skip=true $PROFILE -Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES clean install"
 
 MVN_PID="${ARTIFACTS_DIR}/watchdog.mvn.pid"
 MVN_EXIT="${ARTIFACTS_DIR}/watchdog.mvn.exit"


[06/16] flink git commit: [FLINK-6785] [metrics] Fix ineffective asserts in MetricRegistryTest

Posted by ch...@apache.org.
[FLINK-6785] [metrics] Fix ineffective asserts in MetricRegistryTest

This closes #4035.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08580765
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08580765
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08580765

Branch: refs/heads/master
Commit: 08580765ee0b604a2d878d3e1f06c059d6703824
Parents: daed460
Author: zentol <ch...@apache.org>
Authored: Wed May 31 16:49:43 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:02:07 2017 +0200

----------------------------------------------------------------------
 .../runtime/metrics/MetricRegistryTest.java     | 63 ++++++++++++--------
 1 file changed, 38 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/08580765/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index 1de2551..0d77fbb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -170,16 +170,19 @@ public class MetricRegistryTest extends TestLogger {
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world");
 
 		new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)).shutdown();
+
+		Assert.assertEquals("hello", TestReporter2.mc.getString("arg1", null));
+		Assert.assertEquals("world", TestReporter2.mc.getString("arg2", null));
 	}
 
 	/**
-	 * Reporter that verifies whether configured arguments were properly passed.
+	 * Reporter that exposes the {@link MetricConfig} it was given.
 	 */
 	protected static class TestReporter2 extends TestReporter {
+		static MetricConfig mc;
 		@Override
 		public void open(MetricConfig config) {
-			Assert.assertEquals("hello", config.getString("arg1", null));
-			Assert.assertEquals("world", config.getString("arg2", null));
+			mc = config;
 		}
 	}
 
@@ -246,57 +249,67 @@ public class MetricRegistryTest extends TestLogger {
 
 		TaskManagerMetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
 		root.counter("rootCounter");
+
+		assertTrue(TestReporter6.addedMetric instanceof Counter);
+		assertEquals("rootCounter", TestReporter6.addedMetricName);
+
+		assertTrue(TestReporter7.addedMetric instanceof Counter);
+		assertEquals("rootCounter", TestReporter7.addedMetricName);
+
 		root.close();
 
-		assertTrue(TestReporter6.addCalled);
-		assertTrue(TestReporter6.removeCalled);
-		assertTrue(TestReporter7.addCalled);
-		assertTrue(TestReporter7.removeCalled);
+		assertTrue(TestReporter6.removedMetric instanceof Counter);
+		assertEquals("rootCounter", TestReporter6.removedMetricName);
+
+		assertTrue(TestReporter7.removedMetric instanceof Counter);
+		assertEquals("rootCounter", TestReporter7.removedMetricName);
 
 		registry.shutdown();
 	}
 
 	/**
-	 * Reporter that exposes whether it was notified of added or removed metrics.
+	 * Reporter that exposes the name and metric instance of the last metric that was added or removed.
 	 */
 	protected static class TestReporter6 extends TestReporter {
-		public static boolean addCalled = false;
-		public static boolean removeCalled = false;
+		static Metric addedMetric;
+		static String addedMetricName;
+
+		static Metric removedMetric;
+		static String removedMetricName;
 
 		@Override
 		public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
-			addCalled = true;
-			assertTrue(metric instanceof Counter);
-			assertEquals("rootCounter", metricName);
+			addedMetric = metric;
+			addedMetricName = metricName;
 		}
 
 		@Override
 		public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
-			removeCalled = true;
-			Assert.assertTrue(metric instanceof Counter);
-			Assert.assertEquals("rootCounter", metricName);
+			removedMetric = metric;
+			removedMetricName = metricName;
 		}
 	}
 
 	/**
-	 * Reporter that exposes whether it was notified of added or removed metrics.
+	 * Reporter that exposes the name and metric instance of the last metric that was added or removed.
 	 */
 	protected static class TestReporter7 extends TestReporter {
-		public static boolean addCalled = false;
-		public static boolean removeCalled = false;
+		static Metric addedMetric;
+		static String addedMetricName;
+
+		static Metric removedMetric;
+		static String removedMetricName;
 
 		@Override
 		public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
-			addCalled = true;
-			assertTrue(metric instanceof Counter);
-			assertEquals("rootCounter", metricName);
+			addedMetric = metric;
+			addedMetricName = metricName;
 		}
 
 		@Override
 		public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
-			removeCalled = true;
-			Assert.assertTrue(metric instanceof Counter);
-			Assert.assertEquals("rootCounter", metricName);
+			removedMetric = metric;
+			removedMetricName = metricName;
 		}
 	}
 


[16/16] flink git commit: [FLINK-7046] [travis] Hide download logging messages

Posted by ch...@apache.org.
[FLINK-7046] [travis] Hide download logging messages

This closes #4226.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e3f839a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e3f839a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e3f839a

Branch: refs/heads/master
Commit: 6e3f839ac114bae66b09401766da057d637293e7
Parents: 95d70db
Author: zentol <ch...@apache.org>
Authored: Thu Jun 29 21:31:23 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:42:23 2017 +0200

----------------------------------------------------------------------
 tools/travis_mvn_watchdog.sh | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e3f839a/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index 836bfe1..9733866 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -45,7 +45,8 @@ LOG4J_PROPERTIES=${HERE}/log4j-travis.properties
 
 # Maven command to run. We set the forkCount manually, because otherwise Maven sees too many cores
 # on the Travis VMs.
-MVN="mvn -Dflink.forkCount=2 -Dflink.forkCountTestPackage=1 -B -Dmaven.javadoc.skip=true $PROFILE -Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES clean install"
+MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"
+MVN="mvn -Dflink.forkCount=2 -Dflink.forkCountTestPackage=1 -Dmaven.javadoc.skip=true -B $PROFILE $MVN_LOGGING_OPTIONS clean install"
 
 MVN_PID="${ARTIFACTS_DIR}/watchdog.mvn.pid"
 MVN_EXIT="${ARTIFACTS_DIR}/watchdog.mvn.exit"


[13/16] flink git commit: [FLINK-6908] Remove batch/streaming arg from start-cluster.sh

Posted by ch...@apache.org.
[FLINK-6908] Remove batch/streaming arg from start-cluster.sh

This closes #4197.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90a64074
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90a64074
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90a64074

Branch: refs/heads/master
Commit: 90a640748f04edfcca0b25b56df4099c0852b7c6
Parents: 3956269
Author: zentol <ch...@apache.org>
Authored: Tue Jun 27 15:55:16 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:04:06 2017 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/start-cluster.sh | 5 -----
 1 file changed, 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/90a64074/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index 5d2f92b..b660f6b 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -17,11 +17,6 @@
 # limitations under the License.
 ################################################################################
 
-# Start a Flink cluster in batch or streaming mode
-USAGE="Usage: start-cluster.sh [batch|streaming]"
-
-STREAMING_MODE=$1
-
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 


[02/16] flink git commit: [FLINK-5488] Close YarnClient on error in AbstractYarnClusterDescriptor

Posted by ch...@apache.org.
[FLINK-5488] Close YarnClient on error in AbstractYarnClusterDescriptor

This closes #4022.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d2f3f65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d2f3f65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d2f3f65

Branch: refs/heads/master
Commit: 3d2f3f65f6d0722a3b93565846f31a5dd28218d1
Parents: acc2e34
Author: zjureel <zj...@gmail.com>
Authored: Wed May 31 13:13:34 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:02:07 2017 +0200

----------------------------------------------------------------------
 .../flink/yarn/AbstractYarnClusterDescriptor.java    | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d2f3f65/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index c4df92a..db67e9a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -392,6 +392,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	@Override
 	public YarnClusterClient retrieve(String applicationID) {
 
+		YarnClient yarnClient = null;
 		try {
 			// check if required Hadoop environment variables are set. If not, warn user
 			if (System.getenv("HADOOP_CONF_DIR") == null &&
@@ -402,7 +403,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 
 			final ApplicationId yarnAppId = ConverterUtils.toApplicationId(applicationID);
-			final YarnClient yarnClient = getYarnClient();
+			yarnClient = getYarnClient();
 			final ApplicationReport appReport = yarnClient.getApplicationReport(yarnAppId);
 
 			if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
@@ -420,6 +421,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 			return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, false);
 		} catch (Exception e) {
+			if (null != yarnClient) {
+				yarnClient.stop();
+			}
 			throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
 		}
 	}
@@ -539,7 +543,14 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
 			"the resources become available.";
 		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
-		ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+		ClusterResourceDescription freeClusterMem;
+		try {
+			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+		} catch (YarnException | IOException e) {
+			failSessionDuringDeployment(yarnClient, yarnApplication);
+			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
+		}
+
 		if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
 			LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
 				+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + noteRsc);


[04/16] flink git commit: [FLINK-6898] [metrics] Limit size of operator component in metric name

Posted by ch...@apache.org.
[FLINK-6898] [metrics] Limit size of operator component in metric name

This closes #4109.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edb79b0a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edb79b0a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edb79b0a

Branch: refs/heads/master
Commit: edb79b0a8bb2b33e1a0470b99b11274ac0e9c673
Parents: 0858076
Author: zentol <ch...@apache.org>
Authored: Mon Jun 12 15:36:25 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:02:07 2017 +0200

----------------------------------------------------------------------
 .../metrics/groups/AbstractMetricGroup.java      |  2 +-
 .../runtime/metrics/groups/TaskMetricGroup.java  |  6 ++++++
 .../metrics/groups/TaskMetricGroupTest.java      | 19 +++++++++++++++++++
 3 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/edb79b0a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index c67c5ea..ab59977 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -63,7 +63,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> implements MetricGroup {
 
-	private static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class);
+	protected static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class);
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/edb79b0a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index cb7aaa0..338fc20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -42,6 +42,8 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 
 	private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
 
+	static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;
+
 	private final TaskIOMetricGroup ioMetrics;
 
 	/** The execution Id uniquely identifying the executed task represented by this metrics group. */
@@ -131,6 +133,10 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 	// ------------------------------------------------------------------------
 
 	public OperatorMetricGroup addOperator(String name) {
+		if (name != null && name.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
+			LOG.warn("The operator name {} exceeded the {} characters length limit and was truncated.", name, METRICS_OPERATOR_NAME_MAX_LENGTH);
+			name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
+		}
 		OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name);
 
 		synchronized (this) {

http://git-wip-us.apache.org/repos/asf/flink/blob/edb79b0a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index 22b0a1a..47fc98b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -25,10 +25,12 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -150,6 +152,23 @@ public class TaskMetricGroupTest extends TestLogger {
 		registry.shutdown();
 	}
 
+	@Test
+	public void testOperatorNameTruncation() {
+		Configuration cfg = new Configuration();
+		cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, ScopeFormat.SCOPE_OPERATOR_NAME);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+		TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, new JobID(), "jobname");
+		TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, job, new AbstractID(), new AbstractID(), "task", 0, 0);
+
+		String originalName = new String(new char[100]).replace("\0", "-");
+		OperatorMetricGroup operatorMetricGroup = taskMetricGroup.addOperator(originalName);
+
+		String storedName = operatorMetricGroup.getScopeComponents()[0];
+		Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, storedName.length());
+		Assert.assertEquals(originalName.substring(0, TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH), storedName);
+	}
+
 	private static class CountingMetricRegistry extends MetricRegistry {
 
 		private int counter = 0;