You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/15 10:57:16 UTC

[4/4] flink git commit: [FLINK-1450] Several minor stream fold fixes

[FLINK-1450] Several minor stream fold fixes

Closes #481


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aaa231bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aaa231bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aaa231bc

Branch: refs/heads/master
Commit: aaa231bc09dabd57828836ba50ab5e74b6975e5b
Parents: 5f8ba8e
Author: Gyula Fora <gy...@apache.org>
Authored: Sat Mar 14 22:54:19 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Sun Mar 15 10:50:40 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/api/common/functions/FoldFunction.java   |  2 +-
 .../flink/api/common/functions/RichFoldFunction.java      |  2 +-
 .../apache/flink/api/java/typeutils/TypeExtractor.java    |  4 ++--
 .../apache/flink/streaming/api/datastream/DataStream.java |  2 +-
 .../flink/streaming/api/datastream/DiscretizedStream.java |  2 +-
 .../flink/streaming/api/datastream/GroupedDataStream.java |  2 +-
 .../streaming/api/datastream/WindowedDataStream.java      |  8 ++++----
 .../api/invokable/operator/GroupedFoldInvokable.java      |  8 +++++---
 .../api/invokable/operator/StreamFoldInvokable.java       |  4 ++--
 .../api/invokable/operator/windowing/WindowFolder.java    |  8 ++++----
 .../api/invokable/operator/GroupedFoldInvokableTest.java  |  4 ++--
 .../streaming/api/invokable/operator/StreamFoldTest.java  |  2 +-
 .../invokable/operator/windowing/WindowFolderTest.java    |  2 +-
 .../org/apache/flink/streaming/api/scala/DataStream.scala |  6 +++---
 .../flink/streaming/api/scala/WindowedDataStream.scala    | 10 +++++-----
 15 files changed, 34 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
index 0228dc1..a9c5b2b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
@@ -37,7 +37,7 @@ import java.io.Serializable;
  * @param <T> Type of the initial input and the returned element
  * @param <O> Type of the elements that the group/list/stream contains
  */
-public interface FoldFunction<T, O> extends Function, Serializable {
+public interface FoldFunction<O,T> extends Function, Serializable {
 	/**
 	 * The core method of FoldFunction, combining two values into one value of the same type.
 	 * The fold function is consecutively applied to all values of a group until only a single value remains.

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
index 5abee70..3663823 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.common.functions.RichFunction;
  * @param <T> Type of the initial input and the returned element
  * @param <O> Type of the elements that the group/list/stream contains
  */
-public abstract class RichFoldFunction<T, O> extends AbstractRichFunction implements FoldFunction<T, O> {
+public abstract class RichFoldFunction<O, T> extends AbstractRichFunction implements FoldFunction<O, T> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index c5e2e8f..fdfe941 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -102,12 +102,12 @@ public class TypeExtractor {
 		return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType, functionName, allowMissing);
 	}
 
-	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<OUT, IN> foldInterface, TypeInformation<IN> inType)
+	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType)
 	{
 		return getFoldReturnTypes(foldInterface, inType, null, false);
 	}
 
-	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<OUT, IN> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
+	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType((Function) foldInterface, FoldFunction.class, false, false, inType, functionName, allowMissing);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 1ccc24d..53179d7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -535,7 +535,7 @@ public class DataStream<OUT> {
 	 *          of the input values.
 	 * @return The transformed DataStream
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> fold(FoldFunction<R, OUT> folder, R initialValue) {
+	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<OUT, R> folder) {
 		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
 
 		return transform("Fold", outType, new StreamFoldInvokable<OUT, R>(clean(folder), initialValue, outType));

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 5c0be93..6526aa6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -110,7 +110,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 	}
 
 	@Override
-	public <R> DiscretizedStream<R> foldWindow(FoldFunction<R, OUT> foldFunction, R initialValue,
+	public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<OUT, R> foldFunction,
 			TypeInformation<R> outType) {
 
 		DiscretizedStream<R> out = partition(transformation).transform(

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index a57370d..e3c22da 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -91,7 +91,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	 */
 
 	@Override
-	public <R> SingleOutputStreamOperator<R, ?> fold(FoldFunction<R, OUT> folder, R initialValue) {
+	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<OUT, R> folder) {
 
 		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 7c93588..c80546f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -293,11 +293,11 @@ public class WindowedDataStream<OUT> {
 	 *            The output type of the operator
 	 * @return The transformed DataStream
 	 */
-	public <R> DiscretizedStream<R> foldWindow(FoldFunction<R, OUT> foldFunction, R initialValue,
+	public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<OUT, R> foldFunction,
 			TypeInformation<R> outType) {
 
 		return discretize(WindowTransformation.FOLDWINDOW.with(clean(foldFunction)),
-				new BasicWindowBuffer<OUT>()).foldWindow(foldFunction, initialValue, outType);
+				new BasicWindowBuffer<OUT>()).foldWindow(initialValue, foldFunction, outType);
 
 	}
 
@@ -313,11 +313,11 @@ public class WindowedDataStream<OUT> {
 	 *            Initial value given to foldFunction
 	 * @return The transformed DataStream
 	 */
-	public <R> DiscretizedStream<R> foldWindow(FoldFunction<R, OUT> foldFunction, R initialValue) {
+	public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<OUT, R> foldFunction) {
 
 		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(foldFunction),
 				getType());
-		return foldWindow(foldFunction, initialValue, outType);
+		return foldWindow(initialValue, foldFunction, outType);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
index 62876b0..d4b5f7e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
@@ -32,7 +32,8 @@ public class GroupedFoldInvokable<IN, OUT> extends StreamFoldInvokable<IN, OUT>
 	private OUT folded;
 	private OUT initialValue;
 
-	public GroupedFoldInvokable(FoldFunction<OUT, IN> folder, KeySelector<IN, ?> keySelector, OUT initialValue, TypeInformation<OUT> outTypeInformation) {
+	public GroupedFoldInvokable(FoldFunction<IN, OUT> folder, KeySelector<IN, ?> keySelector,
+			OUT initialValue, TypeInformation<OUT> outTypeInformation) {
 		super(folder, initialValue, outTypeInformation);
 		this.keySelector = keySelector;
 		this.initialValue = initialValue;
@@ -49,8 +50,9 @@ public class GroupedFoldInvokable<IN, OUT> extends StreamFoldInvokable<IN, OUT>
 			values.put(key, folded);
 			collector.collect(folded);
 		} else {
-			values.put(key, initialValue);
-			collector.collect(initialValue);
+			OUT first = folded = folder.fold(outTypeSerializer.copy(initialValue), nextValue);
+			values.put(key, first);
+			collector.collect(first);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
index 205afe6..07ed022 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
@@ -25,12 +25,12 @@ import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 public class StreamFoldInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	protected FoldFunction<OUT, IN> folder;
+	protected FoldFunction<IN, OUT> folder;
 	protected OUT accumulator;
 	protected IN nextValue;
 	protected TypeSerializer<OUT> outTypeSerializer;
 
-	public StreamFoldInvokable(FoldFunction<OUT, IN> folder, OUT initialValue, TypeInformation<OUT> outTypeInformation) {
+	public StreamFoldInvokable(FoldFunction<IN, OUT> folder, OUT initialValue, TypeInformation<OUT> outTypeInformation) {
 		super(folder);
 		this.folder = folder;
 		this.accumulator = initialValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
index ffee070..162cb3c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
@@ -30,9 +30,9 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
 public class WindowFolder<IN, OUT> extends MapInvokable<StreamWindow<IN>, StreamWindow<OUT>> {
 
 	private static final long serialVersionUID = 1L;
-	FoldFunction<OUT, IN> folder;
+	FoldFunction<IN, OUT> folder;
 
-	public WindowFolder(FoldFunction<OUT, IN> folder, OUT initialValue) {
+	public WindowFolder(FoldFunction<IN, OUT> folder, OUT initialValue) {
 		super(new WindowFoldFunction<IN, OUT>(folder, initialValue));
 		this.folder = folder;
 	}
@@ -42,9 +42,9 @@ public class WindowFolder<IN, OUT> extends MapInvokable<StreamWindow<IN>, Stream
 
 		private static final long serialVersionUID = 1L;
 		private OUT initialValue;
-		FoldFunction<OUT, IN> folder;
+		FoldFunction<IN, OUT> folder;
 
-		public WindowFoldFunction(FoldFunction<OUT, IN> folder, OUT initialValue) {
+		public WindowFoldFunction(FoldFunction<IN, OUT> folder, OUT initialValue) {
 			this.folder = folder;
 			this.initialValue = initialValue;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
index 176c324..01375d2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
@@ -32,7 +32,7 @@ import org.junit.Test;
 
 public class GroupedFoldInvokableTest {
 
-	private static class MyFolder implements FoldFunction<String, Integer> {
+	private static class MyFolder implements FoldFunction<Integer, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -58,7 +58,7 @@ public class GroupedFoldInvokableTest {
 			}
 		}, "100", outType);
 
-		List<String> expected = Arrays.asList("100", "1001", "100", "1002", "100");
+		List<String> expected = Arrays.asList("1001","10011", "1002", "10022", "1003");
 		List<String> actual = MockContext.createAndExecute(invokable1,
 				Arrays.asList(1, 1, 2, 2, 3));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
index 0604ab1..90a133b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
@@ -30,7 +30,7 @@ import org.junit.Test;
 
 public class StreamFoldTest {
 
-	private static class MyFolder implements FoldFunction<String, Integer> {
+	private static class MyFolder implements FoldFunction<Integer, String> {
 
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java
index 53a6bb8..ccc01e4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java
@@ -33,7 +33,7 @@ public class WindowFolderTest {
 	@Test
 	public void test() {
 		StreamInvokable<StreamWindow<Integer>, StreamWindow<String>> windowReducer = new WindowFolder<Integer,String>(
-				new FoldFunction<String,Integer>() {
+				new FoldFunction<Integer, String>() {
 
 					private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 18b7bf9..59b1906 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -442,7 +442,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Creates a new [[DataStream]] by folding the elements of this DataStream
    * using an associative fold function and an initial value.
    */
-  def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[R,T]): 
+  def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]): 
   DataStream[R] = {
     if (folder == null) {
       throw new NullPointerException("Fold function must not be null.")
@@ -460,11 +460,11 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Creates a new [[DataStream]] by folding the elements of this DataStream
    * using an associative fold function and an initial value.
    */
-  def fold[R: TypeInformation: ClassTag](initialValue: R, fun: (R,T) => R): DataStream[R] = {
+  def fold[R: TypeInformation: ClassTag](initialValue: R)(fun: (R,T) => R): DataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
-    val folder = new FoldFunction[R,T] {
+    val folder = new FoldFunction[T,R] {
       val cleanFun = clean(fun)
 
       def fold(acc: R, v: T) = {

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
index 910f1f9..3bd5b24 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -138,12 +138,12 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    * the current window at every trigger.
    *
    */
-  def foldWindow[R: TypeInformation: ClassTag](folder: FoldFunction[R,T], initialValue: R): 
+  def foldWindow[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]): 
   WindowedDataStream[R] = {
     if (folder == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
-    javaStream.foldWindow(folder, initialValue, implicitly[TypeInformation[R]])
+    javaStream.foldWindow(initialValue, folder, implicitly[TypeInformation[R]])
   }
 
   /**
@@ -151,16 +151,16 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    * the current window at every trigger.
    *
    */
-  def foldWindow[R: TypeInformation: ClassTag](initialValue: R, fun: (R, T) => R): 
+  def foldWindow[R: TypeInformation: ClassTag](initialValue: R)(fun: (R, T) => R): 
   WindowedDataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
-    val folder = new FoldFunction[R,T] {
+    val folder = new FoldFunction[T,R] {
       val cleanFun = clean(fun)
       def fold(acc: R, v: T) = { cleanFun(acc, v) }
     }
-    foldWindow(folder, initialValue)
+    foldWindow(initialValue, folder)
   }
 
   /**