You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/10 07:06:34 UTC

[6/7] flink git commit: [FLINK-5720] Deprecate DataStream#fold()

[FLINK-5720] Deprecate DataStream#fold()

This closes #3816.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50baec6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50baec6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50baec6e

Branch: refs/heads/master
Commit: 50baec6e8ec28663c5db70e0b95b0c8f78c3e3cd
Parents: 236b373
Author: zentol <ch...@apache.org>
Authored: Wed May 3 15:49:03 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue May 9 22:56:49 2017 +0200

----------------------------------------------------------------------
 .../streaming/state/RocksDBFoldingState.java      |  3 +++
 .../flink/api/common/functions/FoldFunction.java  |  3 +++
 .../api/common/functions/RichFoldFunction.java    |  3 +++
 .../api/common/functions/RuntimeContext.java      |  3 +++
 .../flink/api/common/state/FoldingState.java      |  3 +++
 .../api/common/state/FoldingStateDescriptor.java  |  3 +++
 .../flink/api/common/state/KeyedStateStore.java   |  5 ++++-
 .../flink/api/common/state/StateBinder.java       |  3 +++
 .../flink/api/java/typeutils/TypeExtractor.java   |  8 ++++++++
 .../runtime/state/AbstractKeyedStateBackend.java  |  5 ++++-
 .../runtime/state/heap/HeapFoldingState.java      |  7 +++++--
 .../state/internal/InternalFoldingState.java      |  3 +++
 .../api/datastream/AllWindowedStream.java         | 18 ++++++++++++++++++
 .../streaming/api/datastream/KeyedStream.java     |  6 ++++++
 .../streaming/api/datastream/WindowedStream.java  | 18 ++++++++++++++++++
 .../windowing/FoldApplyAllWindowFunction.java     |  3 +++
 .../FoldApplyProcessAllWindowFunction.java        |  3 +++
 .../windowing/FoldApplyProcessWindowFunction.java |  3 +++
 .../windowing/FoldApplyWindowFunction.java        |  3 +++
 .../api/operators/StreamGroupedFold.java          |  3 +++
 .../streaming/api/scala/AllWindowedStream.scala   | 10 ++++++++--
 .../flink/streaming/api/scala/KeyedStream.scala   |  3 +++
 .../streaming/api/scala/WindowedStream.scala      |  8 +++++++-
 23 files changed, 120 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 26dc3dd..d5d9fce 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -39,7 +39,10 @@ import java.io.IOException;
  * @param <N> The type of the namespace.
  * @param <T> The type of the values that can be folded into the state.
  * @param <ACC> The type of the value in the folding state.
+ *
+ * @deprecated will be removed in a future version
  */
+@Deprecated
 public class RocksDBFoldingState<K, N, T, ACC>
 	extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, ACC>
 	implements InternalFoldingState<N, T, ACC> {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
index b52828e..b3fd700 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
@@ -38,8 +38,11 @@ import java.io.Serializable;
  *
  * @param <T> Type of the initial input and the returned element
  * @param <O> Type of the elements that the group/list/stream contains
+ *
+ * @deprecated use {@link AggregateFunction} instead
  */
 @Public
+@Deprecated
 public interface FoldFunction<O, T> extends Function, Serializable {
 	/**
 	 * The core method of FoldFunction, combining two values into one value of the same type.

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
index 245550d..516e1b4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
@@ -28,8 +28,11 @@ import org.apache.flink.annotation.Public;
  *
  * @param <T> Type of the initial input and the returned element
  * @param <O> Type of the elements that the group/list/stream contains
+ *
+ *@deprecated use {@link RichAggregateFunction} instead
  */
 @Public
+@Deprecated
 public abstract class RichFoldFunction<O, T> extends AbstractRichFunction implements FoldFunction<O, T> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 2978f3a..38155f6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -394,8 +394,11 @@ public interface RuntimeContext {
 	 *
 	 * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
 	 *                                       function (function is not part of a KeyedStream).
+	 *
+	 * @deprecated will be removed in a future version
 	 */
 	@PublicEvolving
+	@Deprecated
 	<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
index 684a612..7e45399 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
@@ -35,6 +35,9 @@ import org.apache.flink.annotation.PublicEvolving;
  * 
  * @param <T> Type of the values folded into the state
  * @param <ACC> Type of the value in the state
+ *
+ * @deprecated will be removed in a future version
  */
 @PublicEvolving
+@Deprecated
 public interface FoldingState<T, ACC> extends AppendingState<T, ACC> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index 73bfaa8..f7609c3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -32,8 +32,11 @@ import static java.util.Objects.requireNonNull;
  *
  * @param <T> Type of the values folded int othe state
  * @param <ACC> Type of the value in the state
+ *
+ * @deprecated will be removed in a future version in favor of {@link AggregatingStateDescriptor}
  */
 @PublicEvolving
+@Deprecated
 public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState<T, ACC>, ACC> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
index 2187f6c..ea9ac41 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
@@ -193,8 +193,11 @@ public interface KeyedStateStore {
 	 *
 	 * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
 	 *                                       function (function is not part of a KeyedStream).
+	 *
+	 * @deprecated will be removed in a future version
 	 */
 	@PublicEvolving
+	@Deprecated
 	<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);
 
 	/**
@@ -236,4 +239,4 @@ public interface KeyedStateStore {
 	 */
 	@PublicEvolving
 	<UK, UV> MapState<UK,UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
index 9df7a47..a373923 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
@@ -68,7 +68,10 @@ public interface StateBinder {
 	 *
 	 * @param <T> Type of the values folded into the state
 	 * @param <ACC> Type of the value in the state
+	 *
+	 * @deprecated will be removed in a future version
 	 */
+	@Deprecated
 	<T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a5f236f..f1bf957 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -177,13 +177,21 @@ public class TypeExtractor {
 		return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType, functionName, allowMissing);
 	}
 
+	/**
+	 * @deprecated will be removed in a future version
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType)
 	{
 		return getFoldReturnTypes(foldInterface, inType, null, false);
 	}
 
+	/**
+	 * @deprecated will be removed in a future version
+	 */
 	@PublicEvolving
+	@Deprecated
 	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType((Function) foldInterface, FoldFunction.class, false, false, inType, functionName, allowMissing);

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 47ebe3b..2b225df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -195,8 +195,11 @@ public abstract class AbstractKeyedStateBackend<K>
 	 *
 	 * @param <N> The type of the namespace.
 	 * @param <T> Type of the values folded into the state
-	 * @param <ACC> Type of the value in the state	 *
+	 * @param <ACC> Type of the value in the state
+	 *
+	 * @deprecated will be removed in a future version
 	 */
+	@Deprecated
 	protected abstract <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
 			TypeSerializer<N> namespaceSerializer,
 			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
index dad6d0d..3a77cca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java
@@ -36,7 +36,10 @@ import java.io.IOException;
  * @param <N> The type of the namespace.
  * @param <T> The type of the values that can be folded into the state.
  * @param <ACC> The type of the value in the folding state.
+ *
+ * @deprecated will be removed in a future version
  */
+@Deprecated
 public class HeapFoldingState<K, N, T, ACC>
 		extends AbstractHeapState<K, N, ACC, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
 		implements InternalFoldingState<N, T, ACC> {
@@ -84,7 +87,7 @@ public class HeapFoldingState<K, N, T, ACC>
 		}
 	}
 
-	static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> {
+	private static final class FoldTransformation<T, ACC> implements StateTransformationFunction<ACC, T> {
 
 		private final FoldingStateDescriptor<T, ACC> stateDescriptor;
 		private final FoldFunction<T, ACC> foldFunction;
@@ -99,4 +102,4 @@ public class HeapFoldingState<K, N, T, ACC>
 			return foldFunction.fold((previousState != null) ? previousState : stateDescriptor.getDefaultValue(), value);
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
index eb58ce5..4ef258f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
@@ -28,5 +28,8 @@ import org.apache.flink.api.common.state.FoldingState;
  * @param <N> The type of the namespace
  * @param <T> Type of the values folded into the state
  * @param <ACC> Type of the value in the state
+ *
+ * @deprecated will be removed in a future version
  */
+@Deprecated
 public interface InternalFoldingState<N, T, ACC> extends InternalAppendingState<N, T, ACC>, FoldingState<T, ACC> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 0d953a9..7ea65fc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -754,7 +754,10 @@ public class AllWindowedStream<T, W extends Window> {
 	 *
 	 * @param function The fold function.
 	 * @return The data stream that is the result of applying the fold function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction)} instead
 	 */
+	@Deprecated
 	public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
 		if (function instanceof RichFunction) {
 			throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " +
@@ -774,7 +777,10 @@ public class AllWindowedStream<T, W extends Window> {
 	 *
 	 * @param function The fold function.
 	 * @return The data stream that is the result of applying the fold function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, TypeInformation, TypeInformation)} instead
 	 */
+	@Deprecated
 	public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
 		if (function instanceof RichFunction) {
 			throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction. " +
@@ -795,8 +801,11 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param foldFunction The fold function that is used for incremental aggregation.
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction)} instead
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, AllWindowFunction<ACC, R, W> function) {
 
 		TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
@@ -821,8 +830,11 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param foldAccumulatorType Type information for the result type of the fold function
 	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, AllWindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
 			FoldFunction<T, ACC> foldFunction,
 			AllWindowFunction<ACC, R, W> function,
@@ -901,8 +913,11 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param foldFunction The fold function that is used for incremental aggregation.
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction)} instead
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> function) {
 
 		TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
@@ -927,8 +942,11 @@ public class AllWindowedStream<T, W extends Window> {
 	 * @param foldAccumulatorType Type information for the result type of the fold function
 	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, ProcessAllWindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
 			FoldFunction<T, ACC> foldFunction,
 			ProcessAllWindowFunction<ACC, R, W> function,

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 9334c66..e3171c3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -416,7 +416,10 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * @param initialValue
 	 *            The initialValue passed to the folders for each key.
 	 * @return The transformed DataStream.
+	 *
+	 * @deprecated will be removed in a future version
 	 */
+	@Deprecated
 	public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> folder) {
 
 		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(
@@ -748,8 +751,11 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * @param queryableStateName Name under which to the publish the queryable state instance
 	 * @param stateDescriptor State descriptor to create state instance from
 	 * @return Queryable state instance
+	 *
+	 * @deprecated will be removed in a future version
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <ACC> QueryableStateStream<KEY, ACC> asQueryableState(
 			String queryableStateName,
 			FoldingStateDescriptor<T, ACC> stateDescriptor) {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 2d7dafe..7913e95 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -487,7 +487,10 @@ public class WindowedStream<T, K, W extends Window> {
 	 *
 	 * @param function The fold function.
 	 * @return The data stream that is the result of applying the fold function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregationFunction)} instead
 	 */
+	@Deprecated
 	public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function) {
 		if (function instanceof RichFunction) {
 			throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
@@ -507,7 +510,10 @@ public class WindowedStream<T, K, W extends Window> {
 	 *
 	 * @param function The fold function.
 	 * @return The data stream that is the result of applying the fold function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, TypeInformation, TypeInformation)} instead
 	 */
+	@Deprecated
 	public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
 		if (function instanceof RichFunction) {
 			throw new UnsupportedOperationException("FoldFunction can not be a RichFunction. " +
@@ -528,8 +534,11 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param foldFunction The fold function that is used for incremental aggregation.
 	 * @param function The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction)} instead
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, WindowFunction<ACC, R, K, W> function) {
 
 		TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
@@ -554,8 +563,11 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param foldAccumulatorType Type information for the result type of the fold function
 	 * @param resultType Type information for the result type of the window function
 	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, ProcessWindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
 			FoldFunction<T, ACC> foldFunction,
 			WindowFunction<ACC, R, K, W> function,
@@ -638,8 +650,11 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param foldFunction The fold function that is used for incremental aggregation.
 	 * @param windowFunction The window function.
 	 * @return The data stream that is the result of applying the window function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction)} instead
 	 */
 	@PublicEvolving
+	@Deprecated
 	public <R, ACC> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction) {
 		if (foldFunction instanceof RichFunction) {
 			throw new UnsupportedOperationException("FoldFunction can not be a RichFunction.");
@@ -667,7 +682,10 @@ public class WindowedStream<T, K, W extends Window> {
 	 * @param windowFunction The process window function.
 	 * @param windowResultType The process window function result type.
 	 * @return The data stream that is the result of applying the fold function to the window.
+	 *
+	 * @deprecated use {@link #aggregate(AggregateFunction, WindowFunction, TypeInformation, TypeInformation, TypeInformation)} instead
 	 */
+	@Deprecated
 	@Internal
 	public <R, ACC> SingleOutputStreamOperator<R> fold(
 			ACC initialValue,

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
index 30662f0..2069f7a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java
@@ -37,8 +37,11 @@ import org.apache.flink.util.Collector;
 /**
  * Internal {@link AllWindowFunction} that is used for implementing a fold on a window configuration
  * that only allows {@link AllWindowFunction} and cannot directly execute a {@link FoldFunction}.
+ *
+ * @deprecated will be removed in a future version
  */
 @Internal
+@Deprecated
 public class FoldApplyAllWindowFunction<W extends Window, T, ACC, R>
 	extends WrappingFunction<AllWindowFunction<ACC, R, W>>
 	implements AllWindowFunction<T, R, W>, OutputTypeConfigurable<R> {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
index b96a8ff..1d39252 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -39,8 +39,11 @@ import org.apache.flink.util.Collector;
  * Internal {@link ProcessAllWindowFunction} that is used for implementing a fold on a window
  * configuration that only allows {@link ProcessAllWindowFunction} and cannot directly execute a
  * {@link FoldFunction}.
+ *
+ * @deprecated will be removed in a future version
  */
 @Internal
+@Deprecated
 public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
 	extends ProcessAllWindowFunction<T, R, W>
 	implements OutputTypeConfigurable<R> {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
index 98f5622..fa4fe86 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -39,8 +39,11 @@ import org.apache.flink.util.Collector;
  * Internal {@link ProcessWindowFunction} that is used for implementing a fold on a window
  * configuration that only allows {@link ProcessWindowFunction} and cannot directly execute a
  * {@link FoldFunction}.
+ *
+ * @deprecated will be removed in a future version
  */
 @Internal
+@Deprecated
 public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
 	extends ProcessWindowFunction<T, R, K, W>
 	implements OutputTypeConfigurable<R> {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
index 770deb0..865dbc9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java
@@ -37,8 +37,11 @@ import org.apache.flink.util.Collector;
 /**
  * Internal {@link WindowFunction} that is used for implementing a fold on a window configuration
  * that only allows {@link WindowFunction} and cannot directly execute a {@link FoldFunction}.
+ *
+ * @deprecated will be removed in a future version
  */
 @Internal
+@Deprecated
 public class FoldApplyWindowFunction<K, W extends Window, T, ACC, R>
 	extends WrappingFunction<WindowFunction<ACC, R, K, W>>
 	implements WindowFunction<T, R, K, W>, OutputTypeConfigurable<R> {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 1ed7178..07c5c90 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -35,8 +35,11 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 /**
  * A {@link StreamOperator} for executing a {@link FoldFunction} on a
  * {@link org.apache.flink.streaming.api.datastream.KeyedStream}.
+ *
+ * @deprecated will be removed in a future version
  */
 @Internal
+@Deprecated
 public class StreamGroupedFold<IN, OUT, KEY>
 		extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
 		implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 757e45f..bbdcf4a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -401,7 +401,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    *
    * @param function The fold function.
    * @return The data stream that is the result of applying the fold function to the window.
-   */
+    */
+  @deprecated("use [[aggregate()]] instead")
   def fold[R: TypeInformation](
       initialValue: R,
       function: FoldFunction[T,R]): DataStream[R] = {
@@ -421,7 +422,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
    *
    * @param function The fold function.
    * @return The data stream that is the result of applying the fold function to the window.
-   */
+    */
+  @deprecated("use [[aggregate()]] instead")
   def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R] = {
     if (function == null) {
       throw new NullPointerException("Fold function must not be null.")
@@ -444,6 +446,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   def fold[ACC: TypeInformation, R: TypeInformation](
       initialValue: ACC,
       preAggregator: FoldFunction[T, ACC],
@@ -474,6 +477,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     * @param windowFunction The process window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   @PublicEvolving
   def fold[ACC: TypeInformation, R: TypeInformation](
       initialValue: ACC,
@@ -505,6 +509,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   def fold[ACC: TypeInformation, R: TypeInformation](
       initialValue: ACC,
       preAggregator: (ACC, T) => ACC,
@@ -540,6 +545,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
     * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   @PublicEvolving
   def fold[ACC: TypeInformation, R: TypeInformation](
       initialValue: ACC,

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index d5ef89f..aaeb1ec 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -184,6 +184,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    * using an associative fold function and an initial value. An independent 
    * aggregate is kept per key.
    */
+  @deprecated("will be removed in a future version")
   def fold[R: TypeInformation](initialValue: R, folder: FoldFunction[T,R]): 
       DataStream[R] = {
     if (folder == null) {
@@ -201,6 +202,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    * using an associative fold function and an initial value. An independent 
    * aggregate is kept per key.
    */
+  @deprecated("will be removed in a future version")
   def fold[R: TypeInformation](initialValue: R)(fun: (R,T) => R): DataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("Fold function must not be null.")
@@ -507,6 +509,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
     * @return Queryable state instance
     */
   @PublicEvolving
+  @deprecated("will be removed in a future version")
   def asQueryableState[ACC](
       queryableStateName: String,
       stateDescriptor: FoldingStateDescriptor[T, ACC]) : QueryableStateStream[K, ACC] =  {

http://git-wip-us.apache.org/repos/asf/flink/blob/50baec6e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 4e0e1a4..0f8a6e0 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -382,6 +382,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
    * @param function The fold function.
    * @return The data stream that is the result of applying the fold function to the window.
    */
+  @deprecated("use [[aggregate()]] instead")
   def fold[R: TypeInformation](
       initialValue: R,
       function: FoldFunction[T,R]): DataStream[R] = {
@@ -401,7 +402,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
    *
    * @param function The fold function.
    * @return The data stream that is the result of applying the fold function to the window.
-   */
+    */
+  @deprecated("use [[aggregate()]] instead")
   def fold[R: TypeInformation](initialValue: R)(function: (R, T) => R): DataStream[R] = {
     if (function == null) {
       throw new NullPointerException("Fold function must not be null.")
@@ -423,6 +425,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     * @param function The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   def fold[ACC: TypeInformation, R: TypeInformation](
       initialValue: ACC,
       foldFunction: FoldFunction[T, ACC],
@@ -452,6 +455,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     * @param windowFunction The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   def fold[ACC: TypeInformation, R: TypeInformation](
       initialValue: ACC,
       foldFunction: (ACC, T) => ACC,
@@ -486,6 +490,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     * @param function The process window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   @PublicEvolving
   def fold[R: TypeInformation, ACC: TypeInformation](
       initialValue: ACC,
@@ -516,6 +521,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
     * @param function The process window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
+  @deprecated("use [[aggregate()]] instead")
   @PublicEvolving
   def fold[R: TypeInformation, ACC: TypeInformation](
       initialValue: ACC,