You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/10 07:06:29 UTC

[1/7] flink git commit: [FLINK-6461] Deprecate web config defaults in ConfigConstants

Repository: flink
Updated Branches:
  refs/heads/master b78753621 -> 54ceec16c


[FLINK-6461] Deprecate web config defaults in ConfigConstants

This closes #3831.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1025470
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1025470
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1025470

Branch: refs/heads/master
Commit: c1025470a91376e251f966fc58cb5cb1c59c7c66
Parents: 4ab3938
Author: zentol <ch...@apache.org>
Authored: Fri May 5 12:39:55 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue May 9 22:56:48 2017 +0200

----------------------------------------------------------------------
 .../client/program/StandaloneClusterClient.java |  5 +-
 .../flink/configuration/ConfigConstants.java    | 65 +++++++++++++++++---
 .../flink/configuration/JobManagerOptions.java  |  7 +++
 .../flink/api/java/ExecutionEnvironment.java    |  4 --
 .../runtime/webmonitor/WebMonitorConfig.java    | 36 +----------
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 32 +++-------
 .../webmonitor/WebRuntimeMonitorITCase.java     | 13 ++--
 .../executiongraph/ExecutionGraphBuilder.java   |  5 +-
 .../runtime/webmonitor/WebMonitorUtils.java     |  6 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  3 +-
 .../environment/StreamExecutionEnvironment.java |  4 --
 .../apache/flink/test/util/TestBaseUtils.java   |  5 +-
 .../flink/test/web/WebFrontendITCase.java       |  5 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  3 +-
 14 files changed, 98 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c1025470/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index fd179c0..7517504 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -18,8 +18,8 @@
 package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -53,8 +53,7 @@ public class StandaloneClusterClient extends ClusterClient {
 	@Override
 	public String getWebInterfaceURL() {
 		String host = this.getJobManagerAddress().getHostString();
-		int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
-			ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+		int port = getFlinkConfiguration().getInteger(JobManagerOptions.WEB_PORT);
 		return "http://" +  host + ":" + port;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c1025470/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 92e6b5d..c3704be 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1338,36 +1338,81 @@ public final class ConfigConstants {
 		key("jobmanager.web.address")
 			.noDefaultValue();
 
-	/** The config key for the port of the JobManager web frontend.
-	 * Setting this value to {@code -1} disables the web frontend. */
+	/**
+	 * The config key for the port of the JobManager web frontend.
+	 * Setting this value to {@code -1} disables the web frontend.
+	 *
+	 * @deprecated use {@link JobManagerOptions#WEB_PORT} instead
+	 */
+	@Deprecated
 	public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
 
-	/** Default value to override SSL support for the JobManager web UI */
+	/**
+	 * Default value to override SSL support for the JobManager web UI
+	 *
+	 * @deprecated use {@link JobManagerOptions#WEB_SSL_ENABLED} instead
+	 */
+	@Deprecated
 	public static final boolean DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED = true;
 
-	/** The default number of archived jobs for the jobmanager */
+	/**
+	 * The default number of archived jobs for the jobmanager
+	 *
+	 * @deprecated use {@link JobManagerOptions#WEB_ARCHIVE_COUNT} instead
+	 */
+	@Deprecated
 	public static final int DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT = 5;
 
-	/** By default, submitting jobs from the web-frontend is allowed. */
+	/**
+	 * By default, submitting jobs from the web-frontend is allowed.
+	 *
+	 * @deprecated use {@link JobManagerOptions#WEB_SUBMIT_ENABLE} instead
+	 */
+	@Deprecated
 	public static final boolean DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED = true;
 
 	/** @deprecated Config key has been deprecated. Therefore, no default value required. */
 	@Deprecated
 	public static final boolean DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = false;
 
-	/** Default number of checkpoints to remember for recent history. */
+	/**
+	 * Default number of checkpoints to remember for recent history.
+	 *
+	 * @deprecated use {@link JobManagerOptions#WEB_CHECKPOINTS_HISTORY_SIZE} instead
+	 */
+	@Deprecated
 	public static final int DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = 10;
 
-	/** Time after which cached stats are cleaned up. */
+	/**
+	 * Time after which cached stats are cleaned up.
+	 *
+	 * @@deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_CLEANUP_INTERVAL} instead
+	 */
+	@Deprecated
 	public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = 10 * 60 * 1000;
 
-	/** Time after which available stats are deprecated and need to be refreshed (by resampling). */
+	/**
+	 * Time after which available stats are deprecated and need to be refreshed (by resampling).
+	 *
+	 * @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_REFRESH_INTERVAL} instead
+	 */
+	@Deprecated
 	public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = 60 * 1000;
 
-	/** Number of samples to take to determine back pressure. */
+	/**
+	 * Number of samples to take to determine back pressure.
+	 *
+	 * @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_NUM_SAMPLES} instead
+	 */
+	@Deprecated
 	public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = 100;
 
-	/** Delay between samples to determine back pressure. */
+	/**
+	 * Delay between samples to determine back pressure.
+	 *
+	 * @deprecated use {@link JobManagerOptions#WEB_BACKPRESSURE_DELAY} instead
+	 */
+	@Deprecated
 	public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = 50;
 
 	// ------------------------------ Akka Values ------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c1025470/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index b924e8e..76b6bed 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -116,6 +116,13 @@ public class JobManagerOptions {
 			.defaultValue("*");
 
 	/**
+	 * The config parameter defining the refresh interval for the web-frontend.
+	 */
+	public static final ConfigOption<Long> WEB_REFRESH_INTERVAL =
+		key("jobmanager.web.refresh-interval")
+			.defaultValue(3000L);
+	
+	/**
 	 * Config parameter to override SSL support for the JobManager Web UI
 	 */
 	public static final ConfigOption<Boolean> WEB_SSL_ENABLED =

http://git-wip-us.apache.org/repos/asf/flink/blob/c1025470/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 709ef09..3d8a384 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -1235,10 +1235,6 @@ public abstract class ExecutionEnvironment {
 	public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
 		checkNotNull(conf, "conf");
 
-		if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
-			int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
-			conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
-		}
 		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
 		LocalEnvironment localEnv = new LocalEnvironment(conf);

http://git-wip-us.apache.org/repos/asf/flink/blob/c1025470/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
index dba2145..77537a2 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -18,39 +18,11 @@
 
 package org.apache.flink.runtime.webmonitor;
 
-
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 
 public class WebMonitorConfig {
 
-	// ------------------------------------------------------------------------
-	//  Config Keys
-	// ------------------------------------------------------------------------
-
-	/** The port for the runtime monitor web-frontend server. */
-	public static final String JOB_MANAGER_WEB_PORT_KEY = ConfigConstants.JOB_MANAGER_WEB_PORT_KEY;
-
-	/** The initial refresh interval for the web dashboard */
-	public static final String JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY = "jobmanager.web.refresh-interval";
-
-
-	// ------------------------------------------------------------------------
-	//  Default values
-	// ------------------------------------------------------------------------
-
-	/** Default port for the web dashboard (= 8081) */
-	public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
-
-	/** Default refresh interval for the web dashboard (= 3000 msecs) */
-	public static final long DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL = 3000;
-
-
-	// ------------------------------------------------------------------------
-	//  Config
-	// ------------------------------------------------------------------------
-
 	/** The configuration queried by this config object */
 	private final Configuration config;
 
@@ -67,17 +39,15 @@ public class WebMonitorConfig {
 	}
 
 	public int getWebFrontendPort() {
-		return config.getInteger(JOB_MANAGER_WEB_PORT_KEY, DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+		return config.getInteger(JobManagerOptions.WEB_PORT);
 	}
 
 	public long getRefreshInterval() {
-		return config.getLong(JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY, DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL);
+		return config.getLong(JobManagerOptions.WEB_REFRESH_INTERVAL);
 	}
 	
 	public boolean isProgramSubmitEnabled() {
-		return config.getBoolean(
-			ConfigConstants.JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY,
-			ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED);
+		return config.getBoolean(JobManagerOptions.WEB_SUBMIT_ENABLE);
 	}
 
 	public String getAllowOrigin() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c1025470/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index f83fa27..03b53ad 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -25,6 +25,7 @@ import io.netty.handler.codec.http.router.Router;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -191,21 +192,13 @@ public class WebRuntimeMonitor implements WebMonitor {
 		stackTraceSamples = new StackTraceSampleCoordinator(actorSystem.dispatcher(), 60000);
 
 		// Back pressure stats tracker config
-		int cleanUpInterval = config.getInteger(
-				ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL,
-				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL);
+		int cleanUpInterval = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_CLEANUP_INTERVAL);
 
-		int refreshInterval = config.getInteger(
-				ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL,
-				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL);
+		int refreshInterval = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_REFRESH_INTERVAL);
 
-		int numSamples = config.getInteger(
-				ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES,
-				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES);
+		int numSamples = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_NUM_SAMPLES);
 
-		int delay = config.getInteger(
-				ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_DELAY,
-				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY);
+		int delay = config.getInteger(JobManagerOptions.WEB_BACKPRESSURE_DELAY);
 
 		Time delayBetweenSamples = Time.milliseconds(delay);
 
@@ -219,10 +212,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(executorService);
 
 		// Config to enable https access to the web-ui
-		boolean enableSSL = config.getBoolean(
-				ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED,
-				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) &&
-			SSLUtils.getSSLEnabled(config);
+		boolean enableSSL = config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) &&	SSLUtils.getSSLEnabled(config);
 
 		if (enableSSL) {
 			LOG.info("Enabling ssl for the web frontend");
@@ -310,9 +300,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		// DELETE is the preferred way of stopping a job (Rest-conform)
 		DELETE(router, new JobStoppingHandler());
 
-		int maxCachedEntries = config.getInteger(
-				ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
-			ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
+		int maxCachedEntries = config.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE);
 		CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);
 
 		// Register the checkpoint stats handlers
@@ -525,14 +513,14 @@ public class WebRuntimeMonitor implements WebMonitor {
 	}
 
 	private String getBaseDirStr(Configuration configuration) {
-		return configuration.getString(ConfigConstants.JOB_MANAGER_WEB_TMPDIR_KEY, System.getProperty("java.io.tmpdir"));
+		return configuration.getString(JobManagerOptions.WEB_TMP_DIR);
 	}
 
 	private File getUploadDir(Configuration configuration) {
-		File baseDir = new File(configuration.getString(ConfigConstants.JOB_MANAGER_WEB_UPLOAD_DIR_KEY,
+		File baseDir = new File(configuration.getString(JobManagerOptions.WEB_UPLOAD_DIR,
 			getBaseDirStr(configuration)));
 
-		boolean uploadDirSpecified = configuration.containsKey(ConfigConstants.JOB_MANAGER_WEB_UPLOAD_DIR_KEY);
+		boolean uploadDirSpecified = configuration.contains(JobManagerOptions.WEB_UPLOAD_DIR);
 		return uploadDirSpecified ? baseDir : new File(baseDir, "flink-web-" + UUID.randomUUID());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c1025470/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 5ccfe90..a51a234 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -137,8 +138,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
 			Files.createFile(new File(logDir, "jobmanager.out").toPath());
 
-			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-			config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
+			config.setInteger(JobManagerOptions.WEB_PORT, 0);
+			config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString());
 
 			highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
 				config,
@@ -286,8 +287,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			Files.createFile(new File(logDir, "jobmanager.out").toPath());
 
 			final Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-			config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
+			config.setInteger(JobManagerOptions.WEB_PORT, 0);
+			config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString());
 			config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
 			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeper.getConnectString());
 
@@ -463,8 +464,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 
 		// Web frontend on random port
 		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
+		config.setInteger(JobManagerOptions.WEB_PORT, 0);
+		config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString());
 
 		WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
 			config,

http://git-wip-us.apache.org/repos/asf/flink/blob/c1025470/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 88863e4..aa28fbc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -204,9 +205,7 @@ public class ExecutionGraphBuilder {
 			}
 
 			// Maximum number of remembered checkpoints
-			int historySize = jobManagerConfig.getInteger(
-					ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
-					ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
+			int historySize = jobManagerConfig.getInteger(JobManagerOptions.WEB_CHECKPOINTS_HISTORY_SIZE);
 
 			CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
 					historySize,

http://git-wip-us.apache.org/repos/asf/flink/blob/c1025470/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 2baadb5..dd9527e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -25,8 +25,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 
 import java.net.URI;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -80,14 +80,14 @@ public final class WebMonitorUtils {
 			
 			if (logFilePath == null) {
 				LOG.warn("Log file environment variable '{}' is not set.", logEnv);
-				logFilePath = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
+				logFilePath = config.getString(JobManagerOptions.WEB_LOG_PATH);
 			}
 			
 			// not configured, cannot serve log files
 			if (logFilePath == null || logFilePath.length() < 4) {
 				LOG.warn("JobManager log files are unavailable in the web dashboard. " +
 					"Log file location not found in environment variable '{}' or configuration key '{}'.",
-					logEnv, ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY);
+					logEnv, JobManagerOptions.WEB_LOG_PATH.key());
 				return new LogFileLocation(null, null);
 			}
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/c1025470/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 5092643..57a6415 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2532,8 +2532,7 @@ object JobManager {
 
     val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration)
 
-    val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
-      ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
+    val archiveCount = configuration.getInteger(JobManagerOptions.WEB_ARCHIVE_COUNT)
 
     val archiveDir = configuration.getString(JobManagerOptions.ARCHIVE_DIR)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c1025470/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index aad3a4b..97117d2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1661,10 +1661,6 @@ public abstract class StreamExecutionEnvironment {
 	public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
 		checkNotNull(conf, "conf");
 
-		if (!conf.containsKey(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY)) {
-			int port = ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT;
-			conf.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, port);
-		}
 		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
 		LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf);

http://git-wip-us.apache.org/repos/asf/flink/blob/c1025470/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index f96ab3d..437dd5f 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
@@ -148,8 +149,8 @@ public class TestBaseUtils extends TestLogger {
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s");
 		config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
 
-		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
-		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
+		config.setInteger(JobManagerOptions.WEB_PORT, 8081);
+		config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.toString());
 
 		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c1025470/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 003eb0c..538ac98 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -24,12 +24,15 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 
 import org.apache.commons.io.FileUtils;
 
+import org.apache.commons.math3.optim.nonlinear.vector.JacobianMultivariateVectorOptimizer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.JobManagerCliOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
@@ -84,7 +87,7 @@ public class WebFrontendITCase extends TestLogger {
 		Files.createFile(logFile.toPath());
 		Files.createFile(outFile.toPath());
 		
-		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.getAbsolutePath());
+		config.setString(JobManagerOptions.WEB_LOG_PATH, logFile.getAbsolutePath());
 		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
 
 		cluster = new LocalFlinkMiniCluster(config, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/c1025470/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index b62f957..64417f6 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -372,8 +372,7 @@ public class YarnApplicationMasterRunner {
 				LOG);
 
 			String protocol = "http://";
-			if (config.getBoolean(ConfigConstants.JOB_MANAGER_WEB_SSL_ENABLED,
-				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) {
+			if (config.getBoolean(JobManagerOptions.WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) {
 				protocol = "https://";
 			}
 			final String webMonitorURL = webMonitor == null ? null :


[4/7] flink git commit: [FLINK-6459] Move origin header ConfigOption to JMOptions

Posted by ch...@apache.org.
[FLINK-6459] Move origin header ConfigOption to JMOptions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ab39381
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ab39381
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ab39381

Branch: refs/heads/master
Commit: 4ab39381222df3fd7673c183bc357a4a91b79f32
Parents: 160daa6
Author: zentol <ch...@apache.org>
Authored: Fri May 5 12:13:10 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue May 9 22:56:48 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/configuration/ConfigConstants.java | 6 ------
 .../org/apache/flink/configuration/JobManagerOptions.java    | 8 ++++++++
 .../apache/flink/runtime/webmonitor/WebMonitorConfig.java    | 2 +-
 3 files changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ab39381/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 4897854..92e6b5d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1338,12 +1338,6 @@ public final class ConfigConstants {
 		key("jobmanager.web.address")
 			.noDefaultValue();
 
-	/** The config parameter defining the Access-Control-Allow-Origin header for all
-	 * responses from the web-frontend. */
-	public static final ConfigOption<String> JOB_MANAGER_WEB_ACCESS_CONTROL_ALLOW_ORIGIN =
-		key("jobmanager.web.access-control-allow-origin")
-			.defaultValue("*");
-
 	/** The config key for the port of the JobManager web frontend.
 	 * Setting this value to {@code -1} disables the web frontend. */
 	public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;

http://git-wip-us.apache.org/repos/asf/flink/blob/4ab39381/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index b0b1eee..b924e8e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -108,6 +108,14 @@ public class JobManagerOptions {
 		.defaultValue(8081);
 
 	/**
+	 * The config parameter defining the Access-Control-Allow-Origin header for all
+	 * responses from the web-frontend.
+	 */
+	public static final ConfigOption<String> WEB_ACCESS_CONTROL_ALLOW_ORIGIN =
+		key("jobmanager.web.access-control-allow-origin")
+			.defaultValue("*");
+
+	/**
 	 * Config parameter to override SSL support for the JobManager Web UI
 	 */
 	public static final ConfigOption<Boolean> WEB_SSL_ENABLED =

http://git-wip-us.apache.org/repos/asf/flink/blob/4ab39381/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
index 1807e7d..dba2145 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -81,6 +81,6 @@ public class WebMonitorConfig {
 	}
 
 	public String getAllowOrigin() {
-		return config.getString(ConfigConstants.JOB_MANAGER_WEB_ACCESS_CONTROL_ALLOW_ORIGIN);
+		return config.getString(JobManagerOptions.WEB_ACCESS_CONTROL_ALLOW_ORIGIN);
 	}
 }


[2/7] flink git commit: [FLINK-5978] Move JM WebFrontend address ConfigOption to JMOptions

Posted by ch...@apache.org.
[FLINK-5978] Move JM WebFrontend address ConfigOption to JMOptions

This closes #3552.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/160daa64
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/160daa64
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/160daa64

Branch: refs/heads/master
Commit: 160daa64eff27dee8b31ad7a8b2b89f04a74c04d
Parents: 5c4560d
Author: mengji.fy <me...@taobao.com>
Authored: Fri May 5 12:11:20 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue May 9 22:56:48 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/configuration/ConfigConstants.java  | 7 ++++++-
 .../org/apache/flink/configuration/JobManagerOptions.java     | 7 +++++++
 .../org/apache/flink/runtime/webmonitor/WebMonitorConfig.java | 3 ++-
 3 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/160daa64/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 975a3d4..4897854 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1328,7 +1328,12 @@ public final class ConfigConstants {
 	
 	// ------------------------- JobManager Web Frontend ----------------------
 
-	/** The config key for the address of the JobManager web frontend. */
+	/**
+	 * The config key for the address of the JobManager web frontend.
+	 *
+	 * @deprecated use {@link JobManagerOptions#WEB_FRONTEND_ADDRESS} instead
+	 */
+	@Deprecated
 	public static final ConfigOption<String> DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS =
 		key("jobmanager.web.address")
 			.noDefaultValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/160daa64/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 5481d7a..b0b1eee 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -94,6 +94,13 @@ public class JobManagerOptions {
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Config parameter defining the runtime monitor web-frontend server address.
+	 */
+	public static final ConfigOption<String> WEB_FRONTEND_ADDRESS =
+		key("jobmanager.web.address")
+			.noDefaultValue();
+
+	/**
 	 * The port for the runtime monitor web-frontend server.
 	 */
 	public static final ConfigOption<Integer> WEB_PORT =

http://git-wip-us.apache.org/repos/asf/flink/blob/160daa64/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
index 11e94b0..1807e7d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorConfig.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 
 public class WebMonitorConfig {
 
@@ -62,7 +63,7 @@ public class WebMonitorConfig {
 	}
 
 	public String getWebFrontendAddress() {
-		return config.getValue(ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_ADDRESS);
+		return config.getValue(JobManagerOptions.WEB_FRONTEND_ADDRESS);
 	}
 
 	public int getWebFrontendPort() {


[3/7] flink git commit: [FLINK-6164] Make ProcessWindowFunction a RichFunction

Posted by ch...@apache.org.
[FLINK-6164] Make ProcessWindowFunction a RichFunction

This closes #3824.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c4560d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c4560d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c4560d3

Branch: refs/heads/master
Commit: 5c4560d3bcff47f8563c75ca909d20469e4736c0
Parents: b787536
Author: zentol <ch...@apache.org>
Authored: Wed May 3 15:57:05 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue May 9 22:56:48 2017 +0200

----------------------------------------------------------------------
 .../FoldApplyProcessAllWindowFunction.java      |  2 +-
 .../FoldApplyProcessWindowFunction.java         |  2 +-
 .../windowing/ProcessAllWindowFunction.java     |  4 +-
 .../windowing/ProcessWindowFunction.java        |  4 +-
 .../ReduceApplyProcessAllWindowFunction.java    |  3 +-
 .../ReduceApplyProcessWindowFunction.java       |  2 +-
 .../windowing/RichProcessAllWindowFunction.java | 53 ++------------------
 .../windowing/RichProcessWindowFunction.java    | 53 ++------------------
 .../functions/InternalWindowFunctionTest.java   | 11 ++--
 ...AlignedProcessingTimeWindowOperatorTest.java |  4 +-
 .../function/ProcessAllWindowFunction.scala     |  7 ++-
 .../scala/function/ProcessWindowFunction.scala  |  6 ++-
 .../function/RichProcessAllWindowFunction.scala | 53 +-------------------
 .../function/RichProcessWindowFunction.scala    | 53 +-------------------
 .../ScalaProcessWindowFunctionWrapper.scala     | 20 +++-----
 ...ngIdentityRichProcessAllWindowFunction.scala |  4 +-
 ...ckingIdentityRichProcessWindowFunction.scala |  4 +-
 17 files changed, 45 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
index 38244dd..b96a8ff 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -42,7 +42,7 @@ import org.apache.flink.util.Collector;
  */
 @Internal
 public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
-	extends RichProcessAllWindowFunction<T, R, W>
+	extends ProcessAllWindowFunction<T, R, W>
 	implements OutputTypeConfigurable<R> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
index 1b2c2e2..98f5622 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -42,7 +42,7 @@ import org.apache.flink.util.Collector;
  */
 @Internal
 public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
-	extends RichProcessWindowFunction<T, R, K, W>
+	extends ProcessWindowFunction<T, R, K, W>
 	implements OutputTypeConfigurable<R> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
index 4d247a7..34a37bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
  * @param <W> The type of {@code Window} that this window function can be applied on.
  */
 @PublicEvolving
-public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implements Function {
+public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> extends AbstractRichFunction {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
index 2c80e9e..506b610 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -34,7 +34,7 @@ import org.apache.flink.util.Collector;
  * @param <W> The type of {@code Window} that this window function can be applied on.
  */
 @PublicEvolving
-public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
+public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
index d1f9ccd..e7e6609 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
@@ -32,8 +32,7 @@ import org.apache.flink.util.Collector;
  * {@link ReduceFunction}.
  */
 @Internal
-public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R>
-	extends RichProcessAllWindowFunction<T, R, W> {
+public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R> extends ProcessAllWindowFunction<T, R, W> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
index 836726d..18037b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
  */
 @Internal
 public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
-	extends RichProcessWindowFunction<T, R, K, W> {
+	extends ProcessWindowFunction<T, R, K, W> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
index 1130fa5..a800870 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
@@ -19,10 +19,6 @@
 package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 /**
@@ -32,53 +28,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
  * @param <IN> The type of the input value.
  * @param <OUT> The type of the output value.
  * @param <W> The type of {@code Window} that this window function can be applied on.
+ *
+ * @deprecated use {@link ProcessAllWindowFunction} instead
  */
 @PublicEvolving
-public abstract class RichProcessAllWindowFunction<IN, OUT, W extends Window>
-		extends ProcessAllWindowFunction<IN, OUT, W>
-		implements RichFunction {
+@Deprecated
+public abstract class RichProcessAllWindowFunction<IN, OUT, W extends Window> extends ProcessAllWindowFunction<IN, OUT, W> {
 
 	private static final long serialVersionUID = 1L;
-
-
-	// --------------------------------------------------------------------------------------------
-	//  Runtime context access
-	// --------------------------------------------------------------------------------------------
-
-	private transient RuntimeContext runtimeContext;
-
-	@Override
-	public void setRuntimeContext(RuntimeContext t) {
-		this.runtimeContext = t;
-	}
-
-	@Override
-	public RuntimeContext getRuntimeContext() {
-		if (this.runtimeContext != null) {
-			return this.runtimeContext;
-		} else {
-			throw new IllegalStateException("The runtime context has not been initialized.");
-		}
-	}
-
-	@Override
-	public IterationRuntimeContext getIterationRuntimeContext() {
-		if (this.runtimeContext == null) {
-			throw new IllegalStateException("The runtime context has not been initialized.");
-		} else if (this.runtimeContext instanceof IterationRuntimeContext) {
-			return (IterationRuntimeContext) this.runtimeContext;
-		} else {
-			throw new IllegalStateException("This stub is not part of an iteration step function.");
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Default life cycle methods
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void open(Configuration parameters) throws Exception {}
-
-	@Override
-	public void close() throws Exception {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
index ac55bc6..83da065 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessWindowFunction.java
@@ -19,10 +19,6 @@
 package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 /**
@@ -33,53 +29,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
  * @param <OUT> The type of the output value.
  * @param <KEY> The type of the key.
  * @param <W> The type of {@code Window} that this window function can be applied on.
+ *
+ * @deprecated use {@link ProcessWindowFunction} instead
  */
 @PublicEvolving
-public abstract class RichProcessWindowFunction<IN, OUT, KEY, W extends Window>
-		extends ProcessWindowFunction<IN, OUT, KEY, W>
-		implements RichFunction {
+@Deprecated
+public abstract class RichProcessWindowFunction<IN, OUT, KEY, W extends Window> extends ProcessWindowFunction<IN, OUT, KEY, W> {
 
 	private static final long serialVersionUID = 1L;
-
-
-	// --------------------------------------------------------------------------------------------
-	//  Runtime context access
-	// --------------------------------------------------------------------------------------------
-
-	private transient RuntimeContext runtimeContext;
-
-	@Override
-	public void setRuntimeContext(RuntimeContext t) {
-		this.runtimeContext = t;
-	}
-
-	@Override
-	public RuntimeContext getRuntimeContext() {
-		if (this.runtimeContext != null) {
-			return this.runtimeContext;
-		} else {
-			throw new IllegalStateException("The runtime context has not been initialized.");
-		}
-	}
-
-	@Override
-	public IterationRuntimeContext getIterationRuntimeContext() {
-		if (this.runtimeContext == null) {
-			throw new IllegalStateException("The runtime context has not been initialized.");
-		} else if (this.runtimeContext instanceof IterationRuntimeContext) {
-			return (IterationRuntimeContext) this.runtimeContext;
-		} else {
-			throw new IllegalStateException("This stub is not part of an iteration step function.");
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Default life cycle methods
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void open(Configuration parameters) throws Exception {}
-
-	@Override
-	public void close() throws Exception {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index 4b8057f..4b0f5ab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -24,11 +24,10 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -612,7 +611,7 @@ public class InternalWindowFunctionTest {
 	}
 
 	public static class ProcessWindowFunctionMock
-		extends RichProcessWindowFunction<Long, String, Long, TimeWindow>
+		extends ProcessWindowFunction<Long, String, Long, TimeWindow>
 		implements OutputTypeConfigurable<String> {
 
 		private static final long serialVersionUID = 1L;
@@ -626,7 +625,7 @@ public class InternalWindowFunctionTest {
 	}
 
 	public static class AggregateProcessWindowFunctionMock
-			extends RichProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow>
+			extends ProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow>
 			implements OutputTypeConfigurable<String> {
 
 		private static final long serialVersionUID = 1L;
@@ -640,7 +639,7 @@ public class InternalWindowFunctionTest {
 	}
 
 	public static class AggregateProcessAllWindowFunctionMock
-			extends RichProcessAllWindowFunction<Map<Long, Long>, String, TimeWindow>
+			extends ProcessAllWindowFunction<Map<Long, Long>, String, TimeWindow>
 			implements OutputTypeConfigurable<String> {
 
 		private static final long serialVersionUID = 1L;
@@ -679,7 +678,7 @@ public class InternalWindowFunctionTest {
 	}
 
 	public static class ProcessAllWindowFunctionMock
-		extends RichProcessAllWindowFunction<Long, String, TimeWindow>
+		extends ProcessAllWindowFunction<Long, String, TimeWindow>
 		implements OutputTypeConfigurable<String> {
 
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index a8d3154..2f7e302 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -35,9 +35,7 @@ import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
@@ -1038,7 +1036,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	// ------------------------------------------------------------------------
 
-	private static class StatefulFunction extends RichProcessWindowFunction<Integer, Integer, Integer, TimeWindow> {
+	private static class StatefulFunction extends ProcessWindowFunction<Integer, Integer, Integer, TimeWindow> {
 
 		// we use a concurrent map here even though there is no concurrency, to
 		// get "volatile" style access to entries

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
index 2f0e48e..49911e4 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
@@ -18,10 +18,8 @@
 
 package org.apache.flink.streaming.api.scala.function
 
-import java.io.Serializable
-
 import org.apache.flink.annotation.PublicEvolving
-import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.functions.AbstractRichFunction
 import org.apache.flink.api.common.state.KeyedStateStore
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
@@ -35,7 +33,8 @@ import org.apache.flink.util.Collector
   * @tparam W The type of the window.
   */
 @PublicEvolving
-abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function with Serializable {
+abstract class ProcessAllWindowFunction[IN, OUT, W <: Window]
+    extends AbstractRichFunction {
   /**
     * Evaluates the window and outputs none or several elements.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
index bc79a26..d2075db 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala.function
 import java.io.Serializable
 
 import org.apache.flink.annotation.PublicEvolving
-import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.functions.AbstractRichFunction
 import org.apache.flink.api.common.state.KeyedStateStore
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
@@ -36,7 +36,9 @@ import org.apache.flink.util.Collector
   * @tparam W The type of the window.
   */
 @PublicEvolving
-abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
+abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window]
+    extends AbstractRichFunction {
+
   /**
     * Evaluates the window and outputs none or several elements.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
index 22d64a8..6edc1e6 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
@@ -18,11 +18,7 @@
 
 package org.apache.flink.streaming.api.scala.function
 
-import java.beans.Transient
-
 import org.apache.flink.annotation.Public
-import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext}
-import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.Window
 
 /**
@@ -34,53 +30,8 @@ import org.apache.flink.streaming.api.windowing.windows.Window
   * @tparam W The type of the window.
   */
 @Public
+@deprecated("use [[ProcessAllWindowFunction]] instead")
 abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window]
-    extends ProcessAllWindowFunction[IN, OUT, W]
-    with RichFunction {
-
-  @Transient
-  private var runtimeContext: RuntimeContext = null
-
-  // --------------------------------------------------------------------------------------------
-  //  Runtime context access
-  // --------------------------------------------------------------------------------------------
-
-  override def setRuntimeContext(t: RuntimeContext) {
-    this.runtimeContext = t
-  }
-
-  override def getRuntimeContext: RuntimeContext = {
-    if (this.runtimeContext != null) {
-      this.runtimeContext
-    }
-    else {
-      throw new IllegalStateException("The runtime context has not been initialized.")
-    }
-  }
-
-  override def getIterationRuntimeContext: IterationRuntimeContext = {
-    if (this.runtimeContext == null) {
-      throw new IllegalStateException("The runtime context has not been initialized.")
-    }
-    else {
-      this.runtimeContext match {
-        case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext
-        case _ =>
-          throw new IllegalStateException("This stub is not part of an iteration step function.")
-      }
-    }
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Default life cycle methods
-  // --------------------------------------------------------------------------------------------
-
-  @throws[Exception]
-  override def open(parameters: Configuration) {
-  }
-
-  @throws[Exception]
-  override def close() {
-  }
+    extends ProcessAllWindowFunction[IN, OUT, W] {
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
index 320685a..d9cd275 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessWindowFunction.scala
@@ -18,11 +18,7 @@
 
 package org.apache.flink.streaming.api.scala.function
 
-import java.beans.Transient
-
 import org.apache.flink.annotation.Public
-import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext}
-import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.Window
 
 /**
@@ -35,53 +31,8 @@ import org.apache.flink.streaming.api.windowing.windows.Window
   * @tparam W The type of the window.
   */
 @Public
+@deprecated("use [[ProcessWindowFunction]] instead")
 abstract class RichProcessWindowFunction[IN, OUT, KEY, W <: Window]
-    extends ProcessWindowFunction[IN, OUT, KEY, W]
-    with RichFunction {
-
-  @Transient
-  private var runtimeContext: RuntimeContext = null
-
-  // --------------------------------------------------------------------------------------------
-  //  Runtime context access
-  // --------------------------------------------------------------------------------------------
-
-  override def setRuntimeContext(t: RuntimeContext) {
-    this.runtimeContext = t
-  }
-
-  override def getRuntimeContext: RuntimeContext = {
-    if (this.runtimeContext != null) {
-      this.runtimeContext
-    }
-    else {
-      throw new IllegalStateException("The runtime context has not been initialized.")
-    }
-  }
-
-  override def getIterationRuntimeContext: IterationRuntimeContext = {
-    if (this.runtimeContext == null) {
-      throw new IllegalStateException("The runtime context has not been initialized.")
-    }
-    else {
-      this.runtimeContext match {
-        case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext
-        case _ =>
-          throw new IllegalStateException("This stub is not part of an iteration step function.")
-      }
-    }
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Default life cycle methods
-  // --------------------------------------------------------------------------------------------
-  
-  @throws[Exception]
-  override def open(parameters: Configuration) {
-  }
-
-  @throws[Exception]
-  override def close() {
-  }
+    extends ProcessWindowFunction[IN, OUT, KEY, W] {
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index 263373e..bc4b7dd 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -21,13 +21,9 @@ package org.apache.flink.streaming.api.scala.function.util
 import org.apache.flink.api.common.functions.RuntimeContext
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction}
-import org.apache.flink.streaming.api.functions.windowing.{RichProcessWindowFunction => JRichProcessWindowFunction}
-import org.apache.flink.streaming.api.functions.windowing.{RichProcessAllWindowFunction => JRichProcessAllWindowFunction}
 import org.apache.flink.streaming.api.functions.windowing.{ProcessAllWindowFunction => JProcessAllWindowFunction}
 import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction => ScalaProcessWindowFunction}
 import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction => ScalaProcessAllWindowFunction}
-import org.apache.flink.streaming.api.scala.function.{RichProcessWindowFunction => ScalaRichProcessWindowFunction}
-import org.apache.flink.streaming.api.scala.function.{RichProcessAllWindowFunction => ScalaRichProcessAllWindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
@@ -43,7 +39,7 @@ import scala.collection.JavaConverters._
   */
 final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
     private[this] val func: ScalaProcessWindowFunction[IN, OUT, KEY, W])
-    extends JRichProcessWindowFunction[IN, OUT, KEY, W] {
+    extends JProcessWindowFunction[IN, OUT, KEY, W] {
 
   override def process(
       key: KEY,
@@ -82,7 +78,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
   override def setRuntimeContext(t: RuntimeContext): Unit = {
     super.setRuntimeContext(t)
     func match {
-      case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.setRuntimeContext(t)
+      case rfunc: ScalaProcessWindowFunction[IN, OUT, KEY, W] => rfunc.setRuntimeContext(t)
       case _ =>
     }
   }
@@ -90,7 +86,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
   override def open(parameters: Configuration): Unit = {
     super.open(parameters)
     func match {
-      case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.open(parameters)
+      case rfunc: ScalaProcessWindowFunction[IN, OUT, KEY, W] => rfunc.open(parameters)
       case _ =>
     }
   }
@@ -98,7 +94,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
   override def close(): Unit = {
     super.close()
     func match {
-      case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.close()
+      case rfunc: ScalaProcessWindowFunction[IN, OUT, KEY, W] => rfunc.close()
       case _ =>
     }
   }
@@ -114,7 +110,7 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
   */
 final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
     private[this] val func: ScalaProcessAllWindowFunction[IN, OUT, W])
-    extends JRichProcessAllWindowFunction[IN, OUT, W] {
+    extends JProcessAllWindowFunction[IN, OUT, W] {
 
   override def process(
       context: JProcessAllWindowFunction[IN, OUT, W]#Context,
@@ -145,7 +141,7 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
   override def setRuntimeContext(t: RuntimeContext): Unit = {
     super.setRuntimeContext(t)
     func match {
-      case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.setRuntimeContext(t)
+      case rfunc : ScalaProcessAllWindowFunction[IN, OUT, W] => rfunc.setRuntimeContext(t)
       case _ =>
     }
   }
@@ -153,7 +149,7 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
   override def open(parameters: Configuration): Unit = {
     super.open(parameters)
     func match {
-      case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.open(parameters)
+      case rfunc : ScalaProcessAllWindowFunction[IN, OUT, W] => rfunc.open(parameters)
       case _ =>
     }
   }
@@ -161,7 +157,7 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
   override def close(): Unit = {
     super.close()
     func match {
-      case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.close()
+      case rfunc : ScalaProcessAllWindowFunction[IN, OUT, W] => rfunc.close()
       case _ =>
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
index df005fa..146452b 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
@@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.scala.testutils
 
 import org.apache.flink.api.common.functions.RuntimeContext
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.scala.function.RichProcessAllWindowFunction
+import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
 
 class CheckingIdentityRichProcessAllWindowFunction[T, W <: Window]
-  extends RichProcessAllWindowFunction[T, T, W] {
+  extends ProcessAllWindowFunction[T, T, W] {
 
   override def process(context: Context, input: Iterable[T], out: Collector[T]): Unit = {
     for (value <- input) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5c4560d3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
index d62f2d3..2ec179a 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessWindowFunction.scala
@@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.scala.testutils
 
 import org.apache.flink.api.common.functions.RuntimeContext
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.scala.function.RichProcessWindowFunction
+import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
 
 
 class CheckingIdentityRichProcessWindowFunction[T, K, W <: Window]
-  extends RichProcessWindowFunction[T, T, K, W] {
+  extends ProcessWindowFunction[T, T, K, W] {
 
   override def process(key: K, context: Context, input: Iterable[T], out: Collector[T]): Unit = {
     for (value <- input) {


[7/7] flink git commit: [FLINK-6013][metrics] Add Datadog HTTP metrics reporter

Posted by ch...@apache.org.
[FLINK-6013][metrics] Add Datadog HTTP metrics reporter

This closes #3736.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54ceec16
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54ceec16
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54ceec16

Branch: refs/heads/master
Commit: 54ceec16c11655da4181c0816a3b12d1c4bab465
Parents: 50baec6
Author: Bowen Li <bo...@gmail.com>
Authored: Tue Apr 18 10:27:17 2017 -0700
Committer: zentol <ch...@apache.org>
Committed: Tue May 9 22:56:49 2017 +0200

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      |  24 +++
 flink-dist/pom.xml                              |   7 +
 flink-dist/src/main/assemblies/opt.xml          |   7 +
 flink-metrics/flink-metrics-datadog/pom.xml     | 108 ++++++++++
 .../apache/flink/metrics/datadog/DCounter.java  |  44 ++++
 .../apache/flink/metrics/datadog/DGauge.java    |  45 ++++
 .../apache/flink/metrics/datadog/DMeter.java    |  42 ++++
 .../apache/flink/metrics/datadog/DMetric.java   |  84 ++++++++
 .../apache/flink/metrics/datadog/DSeries.java   |  45 ++++
 .../metrics/datadog/DatadogHttpClient.java      |  97 +++++++++
 .../metrics/datadog/DatadogHttpReporter.java    | 210 +++++++++++++++++++
 .../flink/metrics/datadog/MetricType.java       |  30 +++
 .../metrics/datadog/DatadogHttpClientTest.java  | 199 ++++++++++++++++++
 .../src/test/resources/log4j-test.properties    |  27 +++
 flink-metrics/pom.xml                           |   1 +
 15 files changed, 970 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 290a452..2bc65a6 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -436,6 +436,30 @@ metrics.reporter.stsd.port: 8125
 
 {% endhighlight %}
 
+### Datadog (org.apache.flink.metrics.datadog.DatadogHttpReporter)
+
+In order to use this reporter you must copy `/opt/flink-metrics-datadog-{{site.version}}.jar` into the `/lib` folder
+of your Flink distribution.
+
+Note any variables in Flink metrics, such as `<host>`, `<job_name>`, `<tm_id>`, `<subtask_index>`, `<task_name>`, and `<operator_name>`,
+will be sent to Datadog as tags. Tags will look like `host:localhost` and `job_name:myjobname`.
+
+Parameters:
+
+- `apikey` - the Datadog API key
+- `tags` - (optional) the global tags that will be applied to metrics when sending to Datadog. Tags should be separated by comma only
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: dghttp
+metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
+metrics.reporter.dghttp.apikey: xxx
+metrics.reporter.dghttp.tags: myflinkapp,prod
+
+{% endhighlight %}
+
 ## System metrics
 
 By default Flink gathers several metrics that provide deep insights on the current state.

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 9773991..6d8debf 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -202,6 +202,13 @@ under the License.
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-datadog</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
 		<!-- end optional Flink metrics reporters -->
 
 		<!-- start optional Flink libraries -->

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-dist/src/main/assemblies/opt.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index 95218d7..0386b92 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -105,6 +105,13 @@
 		</file>
 
 		<file>
+			<source>../flink-metrics/flink-metrics-datadog/target/flink-metrics-datadog-${project.version}-shaded.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-metrics-datadog-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
+		<file>
 			<source>../flink-shaded-hadoop/flink-shaded-hadoop2/target/flink-shaded-hadoop2-${project.version}.jar</source>
 			<outputDirectory>opt/</outputDirectory>
 			<destName>flink-shaded-hadoop2-${project.version}.jar</destName>

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/pom.xml b/flink-metrics/flink-metrics-datadog/pom.xml
new file mode 100644
index 0000000..0d473fc
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-metrics</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-metrics-datadog</artifactId>
+	<name>flink-metrics-datadog</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.fasterxml.jackson.core</groupId>
+			<artifactId>jackson-databind</artifactId>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.squareup.okhttp3</groupId>
+			<artifactId>okhttp</artifactId>
+			<version>3.7.0</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.squareup.okio</groupId>
+			<artifactId>okio</artifactId>
+			<version>1.12.0</version>
+		</dependency>
+
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<shadedArtifactAttached>true</shadedArtifactAttached>
+							<relocations combine.children="append">
+								<relocation>
+									<pattern>okhttp3</pattern>
+									<shadedPattern>org.apache.flink.shaded.okhttp3</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>okio</pattern>
+									<shadedPattern>org.apache.flink.shaded.okio</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
new file mode 100644
index 0000000..58abbd6
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DCounter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Counter;
+
+import java.util.List;
+
+/**
+ * Mapping of counter between Flink and Datadog
+ * */
+public class DCounter extends DMetric {
+	private final Counter counter;
+
+	public DCounter(Counter c, String metricName, String host, List<String> tags) {
+		super(MetricType.counter, metricName, host, tags);
+		counter = c;
+	}
+
+	/**
+	 * Visibility of this method must not be changed
+	 * since we deliberately not map it to json object in a Datadog-defined format
+	 * */
+	@Override
+	public Number getMetricValue() {
+		return counter.getCount();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
new file mode 100644
index 0000000..8deb117
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DGauge.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+
+import org.apache.flink.metrics.Gauge;
+
+import java.util.List;
+
+/**
+ * Mapping of gauge between Flink and Datadog
+ * */
+public class DGauge extends DMetric {
+	private final Gauge<Number> gauge;
+
+	public DGauge(Gauge<Number> g, String metricName, String host, List<String> tags) {
+		super(MetricType.gauge, metricName, host, tags);
+		gauge = g;
+	}
+
+	/**
+	 * Visibility of this method must not be changed
+	 * since we deliberately not map it to json object in a Datadog-defined format
+	 * */
+	@Override
+	public Number getMetricValue() {
+		return gauge.getValue();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
new file mode 100644
index 0000000..181a00c
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMeter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Meter;
+
+import java.util.List;
+
+/**
+ * Mapping of meter between Flink and Datadog
+ *
+ * Only consider rate of the meter, due to Datadog HTTP API's limited support of meter
+ * */
+public class DMeter extends DMetric {
+	private final Meter meter;
+
+	public DMeter(Meter m, String metricName, String host, List<String> tags) {
+		super(MetricType.gauge, metricName, host, tags);
+		meter = m;
+	}
+
+	@Override
+	public Number getMetricValue() {
+		return meter.getRate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
new file mode 100644
index 0000000..3f9d6ff
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DMetric.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract metric of Datadog for serialization
+ * */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public abstract class DMetric {
+	private static final long MILLIS_TO_SEC = 1000L;
+
+	/**
+	 * Names of metric/type/tags field and their getters must not be changed
+	 * since they are mapped to json objects in a Datadog-defined format
+	 * */
+	private final String metric; // Metric name
+	private final MetricType type;
+	private final String host;
+	private final List<String> tags;
+
+	public DMetric(MetricType metricType, String metric, String host, List<String> tags) {
+		this.type = metricType;
+		this.metric = metric;
+		this.host = host;
+		this.tags = tags;
+	}
+
+	public MetricType getType() {
+		return type;
+	}
+
+	public String getMetric() {
+		return metric;
+	}
+
+	public String getHost() {
+		return host;
+	}
+
+	public List<String> getTags() {
+		return tags;
+	}
+
+	public List<List<Number>> getPoints() {
+		// One single data point
+		List<Number> point = new ArrayList<>();
+		point.add(getUnixEpochTimestamp());
+		point.add(getMetricValue());
+
+		List<List<Number>> points = new ArrayList<>();
+		points.add(point);
+
+		return points;
+	}
+
+	@JsonIgnore
+	public abstract Number getMetricValue();
+
+	public static long getUnixEpochTimestamp() {
+		return (System.currentTimeMillis() / MILLIS_TO_SEC);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
new file mode 100644
index 0000000..fb0bb09
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DSeries.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Json serialization between Flink and Datadog
+ **/
+public class DSeries {
+	/**
+	 * Names of series field and its getters must not be changed
+	 * since they are mapped to json objects in a Datadog-defined format
+	 * */
+	private List<DMetric> series;
+
+	public DSeries() {
+		series = new ArrayList<>();
+	}
+
+	public void addMetric(DMetric metric) {
+		series.add(metric);
+	}
+
+	public List<DMetric> getSeries() {
+		return series;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
new file mode 100644
index 0000000..dfbcee1
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import okhttp3.RequestBody;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Http client talking to Datadog
+ * */
+public class DatadogHttpClient{
+	private static final String SERIES_URL_FORMAT = "https://app.datadoghq.com/api/v1/series?api_key=%s";
+	private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.com/api/v1/validate?api_key=%s";
+	private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8");
+	private static final int TIMEOUT = 3;
+	private static final ObjectMapper MAPPER = new ObjectMapper();
+
+	private final String seriesUrl;
+	private final String validateUrl;
+	private final OkHttpClient client;
+	private final String apiKey;
+
+	public DatadogHttpClient(String dgApiKey) {
+		if (dgApiKey == null || dgApiKey.isEmpty()) {
+			throw new IllegalArgumentException("Invalid API key:" + dgApiKey);
+		}
+
+		apiKey = dgApiKey;
+		client = new OkHttpClient.Builder()
+			.connectTimeout(TIMEOUT, TimeUnit.SECONDS)
+			.writeTimeout(TIMEOUT, TimeUnit.SECONDS)
+			.readTimeout(TIMEOUT, TimeUnit.SECONDS)
+			.build();
+
+		seriesUrl = String.format(SERIES_URL_FORMAT, apiKey);
+		validateUrl = String.format(VALIDATE_URL_FORMAT, apiKey);
+		validateApiKey();
+	}
+
+	private void validateApiKey() {
+		Request r = new Request.Builder().url(validateUrl).get().build();
+
+		try {
+			Response response = client.newCall(r).execute();
+			if (!response.isSuccessful()) {
+				throw new IllegalArgumentException(
+					String.format("API key: %s is invalid", apiKey));
+			}
+		} catch(IOException e) {
+			throw new IllegalStateException("Failed contacting Datadog to validate API key", e);
+		}
+	}
+
+	public void send(DatadogHttpReporter.DatadogHttpRequest request) throws Exception {
+		String postBody = serialize(request.getSeries());
+
+		Request r = new Request.Builder()
+			.url(seriesUrl)
+			.post(RequestBody.create(MEDIA_TYPE, postBody))
+			.build();
+
+		client.newCall(r).execute().close();
+	}
+
+	public static String serialize(Object obj) throws JsonProcessingException {
+		return MAPPER.writeValueAsString(obj);
+	}
+
+	public void close() {
+		client.dispatcher().executorService().shutdown();
+		client.connectionPool().evictAll();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
new file mode 100644
index 0000000..fcb5c4b
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Metric Reporter for Datadog
+ *
+ * Variables in metrics scope will be sent to Datadog as tags
+ * */
+public class DatadogHttpReporter implements MetricReporter, Scheduled {
+	private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class);
+	private static final String HOST_VARIABLE = "<host>";
+
+	// Both Flink's Gauge and Meter values are taken as gauge in Datadog
+	private final Map<Gauge, DGauge> gauges = new ConcurrentHashMap<>();
+	private final Map<Counter, DCounter> counters = new ConcurrentHashMap<>();
+	private final Map<Meter, DMeter> meters = new ConcurrentHashMap<>();
+
+	private DatadogHttpClient client;
+	private List<String> configTags;
+
+	public static final String API_KEY = "apikey";
+	public static final String TAGS = "tags";
+
+	@Override
+	public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
+		final String name = group.getMetricIdentifier(metricName);
+
+		List<String> tags = new ArrayList<>(configTags);
+		tags.addAll(getTagsFromMetricGroup(group));
+		String host = getHostFromMetricGroup(group);
+
+		if (metric instanceof Counter) {
+			Counter c = (Counter) metric;
+			counters.put(c, new DCounter(c, name, host, tags));
+		} else if (metric instanceof Gauge) {
+			Gauge g = (Gauge) metric;
+			gauges.put(g, new DGauge(g, name, host, tags));
+		} else if (metric instanceof Meter) {
+			Meter m = (Meter) metric;
+			// Only consider rate
+			meters.put(m, new DMeter(m, name, host, tags));
+		} else if (metric instanceof Histogram) {
+			LOGGER.warn("Cannot add {} because Datadog HTTP API doesn't support Histogram", metricName);
+		} else {
+			LOGGER.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
+				"does not support this metric type.", metric.getClass().getName());
+		}
+	}
+
+	@Override
+	public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
+		if (metric instanceof Counter) {
+			counters.remove(metric);
+		} else if (metric instanceof Gauge) {
+			gauges.remove(metric);
+		} else if (metric instanceof Meter) {
+			meters.remove(metric);
+		} else if (metric instanceof Histogram) {
+			// No Histogram is registered
+		} else {
+			LOGGER.warn("Cannot remove unknown metric type {}. This indicates that the reporter " +
+				"does not support this metric type.", metric.getClass().getName());
+		}
+	}
+
+	@Override
+	public void open(MetricConfig config) {
+		client = new DatadogHttpClient(config.getString(API_KEY, null));
+		LOGGER.info("Configured DatadogHttpReporter");
+
+		configTags = getTagsFromConfig(config.getString(TAGS, ""));
+	}
+
+	@Override
+	public void close() {
+		client.close();
+		LOGGER.info("Shut down DatadogHttpReporter");
+	}
+
+	@Override
+	public void report() {
+		DatadogHttpRequest request = new DatadogHttpRequest();
+
+		for (Map.Entry<Gauge, DGauge> entry : gauges.entrySet()) {
+			DGauge g = entry.getValue();
+			try {
+				// Will throw exception if the Gauge is not of Number type
+				// Flink uses Gauge to store many types other than Number
+				g.getMetricValue();
+				request.addGauge(g);
+			} catch (Exception e) {
+				// Remove that Gauge if it's not of Number type
+				gauges.remove(entry.getKey());
+			}
+		}
+
+		for (DCounter c : counters.values()) {
+			request.addCounter(c);
+		}
+
+		for (DMeter m : meters.values()) {
+			request.addMeter(m);
+		}
+
+		try {
+			client.send(request);
+		} catch (Exception e) {
+			LOGGER.warn("Failed reporting metrics to Datadog.", e);
+		}
+	}
+
+	/**
+	 * Get config tags from config 'metrics.reporter.dghttp.tags'
+	 * */
+	private List<String> getTagsFromConfig(String str) {
+		return Arrays.asList(str.split(","));
+	}
+
+	/**
+	 * Get tags from MetricGroup#getAllVariables(), excluding 'host'
+	 * */
+	private List<String> getTagsFromMetricGroup(MetricGroup metricGroup) {
+		List<String> tags = new ArrayList<>();
+
+		for (Map.Entry<String, String> entry: metricGroup.getAllVariables().entrySet()) {
+			if(!entry.getKey().equals(HOST_VARIABLE)) {
+				tags.add(getVariableName(entry.getKey()) + ":" + entry.getValue());
+			}
+		}
+
+		return tags;
+	}
+
+	/**
+	 * Get host from MetricGroup#getAllVariables() if it exists; returns Null otherwise
+	 * */
+	private String getHostFromMetricGroup(MetricGroup metricGroup) {
+		return metricGroup.getAllVariables().get(HOST_VARIABLE);
+	}
+
+	/**
+	 * Given "<xxx>", return "xxx"
+	 * */
+	private String getVariableName(String str) {
+		return str.substring(1, str.length() - 1);
+	}
+
+	/**
+	 * Compact metrics in batch, serialize them, and send to Datadog via HTTP
+	 * */
+	static class DatadogHttpRequest {
+		private final DSeries series;
+
+		public DatadogHttpRequest() {
+			series = new DSeries();
+		}
+
+		public void addGauge(DGauge gauge) {
+			series.addMetric(gauge);
+		}
+
+		public void addCounter(DCounter counter) {
+			series.addMetric(counter);
+		}
+
+		public void addMeter(DMeter meter) {
+			series.addMetric(meter);
+		}
+
+		public DSeries getSeries() {
+			return series;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
new file mode 100644
index 0000000..97f9b29
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/MetricType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+/**
+ * Metric types supported by Datadog
+ * */
+public enum MetricType {
+	/**
+	 * Names of 'gauge' and 'counter' must not be changed
+	 * since they are mapped to json objects in a Datadog-defined format
+	 * */
+	gauge, counter
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
new file mode 100644
index 0000000..bda5d47
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/test/java/org/apache/flink/metrics/datadog/DatadogHttpClientTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.datadog;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Enclosed.class)
+public class DatadogHttpClientTest {
+	public static class TestApiKey {
+		@Test(expected = IllegalArgumentException.class)
+		public void testClientWithEmptyKey() {
+			new DatadogHttpClient("");
+		}
+
+		@Test(expected = IllegalArgumentException.class)
+		public void testClientWithNullKey() {
+			new DatadogHttpClient(null);
+		}
+	}
+
+	@RunWith(PowerMockRunner.class)
+	@PrepareForTest(DMetric.class)
+	public static class TestSerialization {
+		private static List<String> tags = Arrays.asList("tag1", "tag2");
+
+		private static final long MOCKED_SYSTEM_MILLIS = 123L;
+
+		@Before
+		public void mockSystemMillis() {
+			PowerMockito.mockStatic(DMetric.class);
+			PowerMockito.when(DMetric.getUnixEpochTimestamp()).thenReturn(MOCKED_SYSTEM_MILLIS);
+		}
+
+		@Test
+		public void serializeGauge() throws JsonProcessingException {
+
+			DGauge g = new DGauge(new Gauge<Number>() {
+				@Override
+				public Number getValue() {
+					return 1;
+				}
+			}, "testCounter", "localhost", tags);
+
+			assertEquals(
+				"{\"metric\":\"testCounter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+				DatadogHttpClient.serialize(g));
+		}
+
+		@Test
+		public void serializeGaugeWithoutHost() throws JsonProcessingException {
+
+			DGauge g = new DGauge(new Gauge<Number>() {
+				@Override
+				public Number getValue() {
+					return 1;
+				}
+			}, "testCounter", null, tags);
+
+			assertEquals(
+				"{\"metric\":\"testCounter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+				DatadogHttpClient.serialize(g));
+		}
+
+		@Test
+		public void serializeCounter() throws JsonProcessingException {
+			DCounter c = new DCounter(new Counter() {
+				@Override
+				public void inc() {}
+
+				@Override
+				public void inc(long n) {}
+
+				@Override
+				public void dec() {}
+
+				@Override
+				public void dec(long n) {}
+
+				@Override
+				public long getCount() {
+					return 1;
+				}
+			}, "testCounter", "localhost", tags);
+
+			assertEquals(
+				"{\"metric\":\"testCounter\",\"type\":\"counter\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+				DatadogHttpClient.serialize(c));
+		}
+
+		@Test
+		public void serializeCounterWithoutHost() throws JsonProcessingException {
+			DCounter c = new DCounter(new Counter() {
+				@Override
+				public void inc() {}
+
+				@Override
+				public void inc(long n) {}
+
+				@Override
+				public void dec() {}
+
+				@Override
+				public void dec(long n) {}
+
+				@Override
+				public long getCount() {
+					return 1;
+				}
+			}, "testCounter", null, tags);
+
+			assertEquals(
+				"{\"metric\":\"testCounter\",\"type\":\"counter\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1]]}",
+				DatadogHttpClient.serialize(c));
+		}
+
+		@Test
+		public void serializeMeter() throws JsonProcessingException {
+
+			DMeter m = new DMeter(new Meter() {
+				@Override
+				public void markEvent() {}
+
+				@Override
+				public void markEvent(long n) {}
+
+				@Override
+				public double getRate() {
+					return 1;
+				}
+
+				@Override
+				public long getCount() {
+					return 0;
+				}
+			}, "testMeter","localhost", tags);
+
+			assertEquals(
+				"{\"metric\":\"testMeter\",\"type\":\"gauge\",\"host\":\"localhost\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
+				DatadogHttpClient.serialize(m));
+		}
+
+		@Test
+		public void serializeMeterWithoutHost() throws JsonProcessingException {
+
+			DMeter m = new DMeter(new Meter() {
+				@Override
+				public void markEvent() {}
+
+				@Override
+				public void markEvent(long n) {}
+
+				@Override
+				public double getRate() {
+					return 1;
+				}
+
+				@Override
+				public long getCount() {
+					return 0;
+				}
+			}, "testMeter", null, tags);
+
+			assertEquals(
+				"{\"metric\":\"testMeter\",\"type\":\"gauge\",\"tags\":[\"tag1\",\"tag2\"],\"points\":[[123,1.0]]}",
+				DatadogHttpClient.serialize(m));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-metrics/flink-metrics-datadog/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/54ceec16/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
index 317dde8..e1b66c2 100644
--- a/flink-metrics/pom.xml
+++ b/flink-metrics/pom.xml
@@ -40,6 +40,7 @@ under the License.
 		<module>flink-metrics-graphite</module>
 		<module>flink-metrics-jmx</module>
 		<module>flink-metrics-statsd</module>
+		<module>flink-metrics-datadog</module>
 	</modules>
 
 	<!-- override these root dependencies as 'provided', so they don't end up


[5/7] flink git commit: [FLINK-6396] Fix FsSavepointStreamFactoryTest on Windows

Posted by ch...@apache.org.
[FLINK-6396] Fix FsSavepointStreamFactoryTest on Windows

This closes #3789.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/236b373e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/236b373e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/236b373e

Branch: refs/heads/master
Commit: 236b373e3d314f3a01e89ae596a0cf1a8fd2ad55
Parents: c102547
Author: zentol <ch...@apache.org>
Authored: Thu Apr 27 15:56:48 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue May 9 22:56:49 2017 +0200

----------------------------------------------------------------------
 .../state/filesystem/FsSavepointStreamFactoryTest.java       | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/236b373e/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
index a29d29c..3095a09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull;
 
 import java.io.File;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
 import org.junit.Rule;
@@ -48,7 +49,8 @@ public class FsSavepointStreamFactoryTest {
 				jobId,
 				0);
 
-		File[] listed = testRoot.listFiles();
+		Path root = new Path(testRoot.getAbsolutePath());
+		FileStatus[] listed = root.getFileSystem().listStatus(root);
 		assertNotNull(listed);
 		assertEquals(0, listed.length);
 
@@ -59,9 +61,9 @@ public class FsSavepointStreamFactoryTest {
 
 		FileStateHandle handle = (FileStateHandle) stream.closeAndGetHandle();
 
-		listed = testRoot.listFiles();
+		listed = root.getFileSystem().listStatus(root);
 		assertNotNull(listed);
 		assertEquals(1, listed.length);
-		assertEquals(handle.getFilePath().getPath(), listed[0].getPath());
+		assertEquals(handle.getFilePath().getPath(), listed[0].getPath().getPath());
 	}
 }


[6/7] flink git commit: [FLINK-5720] Deprecate DataStream#fold()

Posted by ch...@apache.org.
[FLINK-5720] Deprecate DataStream#fold()

This closes #3816.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50baec6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50baec6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50baec6e

Branch: refs/heads/master
Commit: 50baec6e8ec28663c5db70e0b95b0c8f78c3e3cd
Parents: 236b373
Author: zentol <ch...@apache.org>
Authored: Wed May 3 15:49:03 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue May 9 22:56:49 2017 +0200

----------------------------------------------------------------------
 .../streaming/state/RocksDBFoldingState.java      |  3 +++
 .../flink/api/common/functions/FoldFunction.java  |  3 +++
 .../api/common/functions/RichFoldFunction.java    |  3 +++
 .../api/common/functions/RuntimeContext.java      |  3 +++
 .../flink/api/common/state/FoldingState.java      |  3 +++
 .../api/common/state/FoldingStateDescriptor.java  |  3 +++
 .../flink/api/common/state/KeyedStateStore.java   |  5 ++++-
 .../flink/api/common/state/StateBinder.java       |  3 +++
 .../flink/api/java/typeutils/TypeExtractor.java   |  8 ++++++++
 .../runtime/state/AbstractKeyedStateBackend.java  |  5 ++++-
 .../runtime/state/heap/HeapFoldingState.java      |  7 +++++--
 .../state/internal/InternalFoldingState.java      |  3 +++
 .../api/datastream/AllWindowedStream.java         | 18 ++++++++++++++++++
 .../streaming/api/datastream/KeyedStream.java     |  6 ++++++
 .../streaming/api/datastream/WindowedStream.java  | 18 ++++++++++++++++++
 .../windowing/FoldApplyAllWindowFunction.java     |  3 +++
 .../FoldApplyProcessAllWindowFunction.java        |  3 +++
 .../windowing/FoldApplyProcessWindowFunction.java |  3 +++
 .../windowing/FoldApplyWindowFunction.java        |  3 +++
 .../api/operators/StreamGroupedFold.java          |  3 +++
 .../streaming/api/scala/AllWindowedStream.scala   | 10 ++++++++--
 .../flink/streaming/api/scala/KeyedStream.scala   |  3 +++
 .../streaming/api/scala/WindowedStream.scala      |  8 +++++++-
 23 files changed, 120 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 26dc3dd..d5d9fce 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -39,7 +39,10 @@ import java.io.IOException;
  * @param <N> The type of the namespace.
  * @param <T> The type of the values that can be folded into the state.
  * @param <ACC> The type of the value in the folding state.
+ *
+ * @deprecated will be removed in a future version
  */
+@Deprecated
 public class RocksDBFoldingState<K, N, T, ACC>
 	extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, ACC>
 	implements InternalFoldingState<N, T, ACC> {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
index b52828e..b3fd700 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
@@ -38,8 +38,11 @@ import java.io.Serializable;
  *
  * @param <T> Type of the initial input and the returned element
  * @param <O> Type of the elements that the group/list/stream contains
+ *
+ * @deprecated use {@link AggregateFunction} instead
  */
 @Public
+@Deprecated
 public interface FoldFunction<O, T> extends Function, Serializable {
 	/**
 	 * The core method of FoldFunction, combining two values into one value of the same type.

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
index 245550d..516e1b4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
@@ -28,8 +28,11 @@ import org.apache.flink.annotation.Public;
  *
  * @param <T> Type of the initial input and the returned element
  * @param <O> Type of the elements that the group/list/stream contains
+ *
+ *@deprecated use {@link RichAggregateFunction} instead
  */
 @Public
+@Deprecated
 public abstract class RichFoldFunction<O, T> extends AbstractRichFunction implements FoldFunction<O, T> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 2978f3a..38155f6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -394,8 +394,11 @@ public interface RuntimeContext {
 	 *
 	 * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
 	 *                                       function (function is not part of a KeyedStream).
+	 *
+	 * @deprecated will be removed in a future version
 	 */
 	@PublicEvolving
+	@Deprecated
 	<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
index 684a612..7e45399 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
@@ -35,6 +35,9 @@ import org.apache.flink.annotation.PublicEvolving;
  * 
  * @param <T> Type of the values folded into the state
  * @param <ACC> Type of the value in the state
+ *
+ * @deprecated will be removed in a future version
  */
 @PublicEvolving
+@Deprecated
 public interface FoldingState<T, ACC> extends AppendingState<T, ACC> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index 73bfaa8..f7609c3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -32,8 +32,11 @@ import static java.util.Objects.requireNonNull;
  *
  * @param <T> Type of the values folded int othe state
  * @param <ACC> Type of the value in the state
+ *
+ * @deprecated will be removed in a future version in favor of {@link AggregatingStateDescriptor}
  */
 @PublicEvolving
+@Deprecated
 public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState<T, ACC>, ACC> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
index 2187f6c..ea9ac41 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
@@ -193,8 +193,11 @@ public interface KeyedStateStore {
 	 *
 	 * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
 	 *                                       function (function is not part of a KeyedStream).
+	 *
+	 * @deprecated will be removed in a future version
 	 */
 	@PublicEvolving
+	@Deprecated
 	<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
 
 	/**
@@ -236,4 +239,4 @@ public interface KeyedStateStore {
 	 */
 	@PublicEvolving
 	<UK, UV> MapState<UK,UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
index 9df7a47..a373923 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
@@ -68,7 +68,10 @@ public interface StateBinder {
 	 *
 	 * @param <T> Type of the values folded into the state
 	 * @param <ACC> Type of the value in the state
+	 *
+	 * @deprecated will be removed in a future version
 	 */
+	@Deprecated
 	<T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a5f236f..f1bf957 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -177,13 +177,21 @@ public class TypeExtractor {
 		return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType, functionName, allowMissing);
 	}
 
+	/**
+	 * @deprecated will be removed in a future version
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType)
 	{
 		return getFoldReturnTypes(foldInterface, inType, null, false);
 	}
 
+	/**
+	 * @deprecated will be removed in a future version
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType((Function) foldInterface, FoldFunction.class, false, false, inType, functionName, allowMissing);

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 47ebe3b..2b225df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -195,8 +195,11 @@ public abstract class AbstractKeyedStateBackend<K>
 	 *
 	 * @param <N> The type of the namespace.
 	 * @param <T> Type of the values folded into the state
-	 * @param <ACC> Type of the value in the state	 *
+	 * @param <ACC> Type of the value in the state
+	 *
+	 * @deprecated will be removed in a future version
 	 */
+	@Deprecated
 	protected abstract <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
 			TypeSerializer<N> namespaceSerializer,
 			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index dad6d0d..3a77cca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -36,7 +36,10 @@ import java.io.IOException;
  * @param <N> The type of the namespace.
  * @param <T> The type of the values that can be folded into the state.
  * @param <ACC> The type of the value in the folding state.
+ *
+ * @deprecated will be removed in a future version
  */
+@Deprecated
 public class HeapFoldingState<K, N, T, ACC>
 		extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
 		implements InternalFoldingState<N, T, ACC> {
@@ -84,7 +87,7 @@ public class HeapFoldingState<K, N, T, ACC>
 		}
 	}
 
-	static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> {
+	private static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> {
 
 		private final FoldingStateDescriptor<T, ACC> stateDescriptor;
 		private final FoldFunction<T, ACC> foldFunction;
@@ -99,4 +102,4 @@ public class HeapFoldingState<K, N, T, ACC>
 			return foldFunction.fold((previousState != null) ? previousState : stateDescriptor.getDefaultValue(), value);
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
index eb58ce5..4ef258f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
@@ -28,5 +28,8 @@ import org.apache.flink.api.common.state.FoldingState;
  * @param <N> The type of the namespace
  * @param <T> Type of the values folded into the state
  * @param <ACC> Type of the value in the state
+ *
+ * @deprecated will be removed in a future version
  */
+@Deprecated
 public interface InternalFoldingState<N, T, ACC> extends InternalAppendingState<N, T, ACC>, FoldingState<T, ACC> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 0d953a9..7ea65fc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -754,7 +754,10 @@ public class AllWindowedStream<T, W extends Window> {
 	 *
 	 * @param function The fold function.
 	 * @return The data stream that is the result of applying the fold function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction)} instead
 	 */
+	@Deprecated
 	public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
 		if (function instanceof RichFunction) {
 			throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " +
@@ -774,7 +777,10 @@ public class AllWindowedStream<T, W extends Window> {
 	 *
 	 * @param function The fold function.
 	 * @return The data stream that is the result of applying the fold function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, TypeInformation, TypeInformation)} instead
 	 */
+	@Deprecated
 	public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
 		if (function instanceof RichFunction) {
 			throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " +
@@ -795,8 +801,11 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param foldFunction The fold function that is used for incremental aggregation.
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction)} instead
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, AllWindowFunction<ACC, R, W> function) {
 
 		TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
@@ -821,8 +830,11 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param foldAccumulatorType Type information for the result type of the fold function
 	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, AllWindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
 			FoldFunction<T, ACC> foldFunction,
 			AllWindowFunction<ACC, R, W> function,
@@ -901,8 +913,11 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param foldFunction The fold function that is used for incremental aggregation.
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction)} instead
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> function) {
 
 		TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
@@ -927,8 +942,11 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param foldAccumulatorType Type information for the result type of the fold function
 	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
 			FoldFunction<T, ACC> foldFunction,
 			ProcessAllWindowFunction<ACC, R, W> function,

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 9334c66..e3171c3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -416,7 +416,10 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * @param initialValue
 	 *            The initialValue passed to the folders for each key.
 	 * @return The transformed DataStream.
+	 *
+	 * @deprecated will be removed in a future version
 	 */
+	@Deprecated
 	public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> folder) {
 
 		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(
@@ -748,8 +751,11 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * @param queryableStateName Name under which to the publish the queryable state instance
 	 * @param stateDescriptor State descriptor to create state instance from
 	 * @return Queryable state instance
+	 *
+	 * @deprecated will be removed in a future version
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <ACC> QueryableStateStream<KEY, ACC> asQueryableState(
 			String queryableStateName,
 			FoldingStateDescriptor<T, ACC> stateDescriptor) {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 2d7dafe..7913e95 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -487,7 +487,10 @@ public class WindowedStream<T, K, W extends Window> {
 	 *
 	 * @param function The fold function.
 	 * @return The data stream that is the result of applying the fold function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregationFunction)} instead
 	 */
+	@Deprecated
 	public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
 		if (function instanceof RichFunction) {
 			throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
@@ -507,7 +510,10 @@ public class WindowedStream<T, K, W extends Window> {
 	 *
 	 * @param function The fold function.
 	 * @return The data stream that is the result of applying the fold function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, TypeInformation, TypeInformation)} instead
 	 */
+	@Deprecated
 	public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
 		if (function instanceof RichFunction) {
 			throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
@@ -528,8 +534,11 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param foldFunction The fold function that is used for incremental aggregation.
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction)} instead
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, WindowFunction<ACC, R, K, W> function) {
 
 		TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
@@ -554,8 +563,11 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param foldAccumulatorType Type information for the result type of the fold function
 	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, ProcessWindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
 			FoldFunction<T, ACC> foldFunction,
 			WindowFunction<ACC, R, K, W> function,
@@ -638,8 +650,11 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param foldFunction The fold function that is used for incremental aggregation.
 	 * @param windowFunction The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction)} instead
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <R, ACC> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction) {
 		if (foldFunction instanceof RichFunction) {
 			throw new UnsupportedOperationException("FoldFunction can not be a RichFunction.");
@@ -667,7 +682,10 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param windowFunction The process window function.
 	 * @param windowResultType The process window function result type.
 	 * @return The data stream that is the result of applying the fold function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
 	 */
+	@Deprecated
 	@Internal
 	public <R, ACC> SingleOutputStreamOperator<R> fold(
 			ACC initialValue,

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
index 30662f0..2069f7a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
@@ -37,8 +37,11 @@ import org.apache.flink.util.Collector;
 /**
  * Internal {@link AllWindowFunction} that is used for implementing a fold on a window configuration
  * that only allows {@link AllWindowFunction} and cannot directly execute a {@link FoldFunction}.
+ *
+ * @deprecated will be removed in a future version
  */
 @Internal
+@Deprecated
 public class FoldApplyAllWindowFunction<W extends Window, T, ACC, R>
 	extends WrappingFunction<AllWindowFunction<ACC, R, W>>
 	implements AllWindowFunction<T, R, W>, OutputTypeConfigurable<R> {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
index b96a8ff..1d39252 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -39,8 +39,11 @@ import org.apache.flink.util.Collector;
  * Internal {@link ProcessAllWindowFunction} that is used for implementing a fold on a window
  * configuration that only allows {@link ProcessAllWindowFunction} and cannot directly execute a
  * {@link FoldFunction}.
+ *
+ * @deprecated will be removed in a future version
  */
 @Internal
+@Deprecated
 public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
 	extends ProcessAllWindowFunction<T, R, W>
 	implements OutputTypeConfigurable<R> {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
index 98f5622..fa4fe86 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -39,8 +39,11 @@ import org.apache.flink.util.Collector;
  * Internal {@link ProcessWindowFunction} that is used for implementing a fold on a window
  * configuration that only allows {@link ProcessWindowFunction} and cannot directly execute a
  * {@link FoldFunction}.
+ *
+ * @deprecated will be removed in a future version
  */
 @Internal
+@Deprecated
 public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
 	extends ProcessWindowFunction<T, R, K, W>
 	implements OutputTypeConfigurable<R> {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
index 770deb0..865dbc9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
@@ -37,8 +37,11 @@ import org.apache.flink.util.Collector;
 /**
  * Internal {@link WindowFunction} that is used for implementing a fold on a window configuration
  * that only allows {@link WindowFunction} and cannot directly execute a {@link FoldFunction}.
+ *
+ * @deprecated will be removed in a future version
  */
 @Internal
+@Deprecated
 public class FoldApplyWindowFunction<K, W extends Window, T, ACC, R>
 	extends WrappingFunction<WindowFunction<ACC, R, K, W>>
 	implements WindowFunction<T, R, K, W>, OutputTypeConfigurable<R> {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 1ed7178..07c5c90 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -35,8 +35,11 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 /**
  * A {@link StreamOperator} for executing a {@link FoldFunction} on a
  * {@link org.apache.flink.streaming.api.datastream.KeyedStream}.
+ *
+ * @deprecated will be removed in a future version
  */
 @Internal
+@Deprecated
 public class StreamGroupedFold<IN, OUT, KEY>
 		extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
 		implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 757e45f..bbdcf4a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -401,7 +401,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    *
    * @param function The fold function.
    * @return The data stream that is the result of applying the fold function to the window.
-   */
+    */
+  @deprecated("use [[aggregate()]] instead")
   def fold[R: TypeInformation](
       initialValue: R,
       function: FoldFunction[T,R]): DataStream[R] = {
@@ -421,7 +422,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    *
    * @param function The fold function.
    * @return The data stream that is the result of applying the fold function to the window.
-   */
+    */
+  @deprecated("use [[aggregate()]] instead")
   def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R] = {
     if (function == null) {
       throw new NullPointerException("Fold function must not be null.")
@@ -444,6 +446,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   def fold[ACC: TypeInformation, R: TypeInformation](
       initialValue: ACC,
       preAggregator: FoldFunction[T, ACC],
@@ -474,6 +477,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     * @param windowFunction The process window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   @PublicEvolving
   def fold[ACC: TypeInformation, R: TypeInformation](
       initialValue: ACC,
@@ -505,6 +509,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   def fold[ACC: TypeInformation, R: TypeInformation](
       initialValue: ACC,
       preAggregator: (ACC, T) => ACC,
@@ -540,6 +545,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   @PublicEvolving
   def fold[ACC: TypeInformation, R: TypeInformation](
       initialValue: ACC,

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index d5ef89f..aaeb1ec 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -184,6 +184,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    * using an associative fold function and an initial value. An independent 
    * aggregate is kept per key.
    */
+  @deprecated("will be removed in a future version")
   def fold[R: TypeInformation](initialValue: R, folder: FoldFunction[T,R]): 
       DataStream[R] = {
     if (folder == null) {
@@ -201,6 +202,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    * using an associative fold function and an initial value. An independent 
    * aggregate is kept per key.
    */
+  @deprecated("will be removed in a future version")
   def fold[R: TypeInformation](initialValue: R)(fun: (R,T) => R): DataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("Fold function must not be null.")
@@ -507,6 +509,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
     * @return Queryable state instance
     */
   @PublicEvolving
+  @deprecated("will be removed in a future version")
   def asQueryableState[ACC](
       queryableStateName: String,
       stateDescriptor: FoldingStateDescriptor[T, ACC]) : QueryableStateStream[K, ACC] =  {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 4e0e1a4..0f8a6e0 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -382,6 +382,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
    * @param function The fold function.
    * @return The data stream that is the result of applying the fold function to the window.
    */
+  @deprecated("use [[aggregate()]] instead")
   def fold[R: TypeInformation](
       initialValue: R,
       function: FoldFunction[T,R]): DataStream[R] = {
@@ -401,7 +402,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
    *
    * @param function The fold function.
    * @return The data stream that is the result of applying the fold function to the window.
-   */
+    */
+  @deprecated("use [[aggregate()]] instead")
   def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R] = {
     if (function == null) {
       throw new NullPointerException("Fold function must not be null.")
@@ -423,6 +425,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     * @param function The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   def fold[ACC: TypeInformation, R: TypeInformation](
       initialValue: ACC,
       foldFunction: FoldFunction[T, ACC],
@@ -452,6 +455,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   def fold[ACC: TypeInformation, R: TypeInformation](
       initialValue: ACC,
       foldFunction: (ACC, T) => ACC,
@@ -486,6 +490,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     * @param function The process window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   @PublicEvolving
   def fold[R: TypeInformation, ACC: TypeInformation](
       initialValue: ACC,
@@ -516,6 +521,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     * @param function The process window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   @PublicEvolving
   def fold[R: TypeInformation, ACC: TypeInformation](
       initialValue: ACC,