You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/15 10:57:13 UTC

[1/4] flink git commit: [FLINK-1450] Added fold operator for the Streaming API

Repository: flink
Updated Branches:
  refs/heads/master 1c269e2f2 -> aaa231bc0


[FLINK-1450] Added fold operator for the Streaming API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c99cb69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c99cb69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c99cb69

Branch: refs/heads/master
Commit: 6c99cb69665564d9731a5087559c9874efc9af37
Parents: 1c269e2
Author: Akshay Dixit <ak...@gmail.com>
Authored: Thu Mar 12 23:19:31 2015 +0530
Committer: Gyula Fora <gy...@apache.org>
Committed: Sat Mar 14 19:29:44 2015 +0100

----------------------------------------------------------------------
 .../api/common/functions/FoldFunction.java      | 53 ++++++++++++++++
 .../api/common/functions/RichFoldFunction.java  | 39 ++++++++++++
 .../flink/api/java/typeutils/TypeExtractor.java | 11 ++++
 .../streaming/api/datastream/DataStream.java    | 21 +++++++
 .../invokable/operator/StreamFoldInvokable.java | 65 ++++++++++++++++++++
 .../api/invokable/operator/StreamFoldTest.java  | 53 ++++++++++++++++
 .../flink/streaming/api/scala/DataStream.scala  | 18 +++++-
 7 files changed, 257 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c99cb69/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
new file mode 100644
index 0000000..0228dc1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ * Base interface for Fold functions. Fold functions combine groups of elements to
+ * a single value, by applying a binary operation to an initial accumulator element every element from a group elements.
+ * <p>
+ * The basic syntax for using a FoldFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ *
+ * X initialValue = ...;
+ * DataSet<X> result = input.fold(new MyFoldFunction(), initialValue);
+ * </blockquote></pre>
+ * <p>
+ * Like all functions, the FoldFunction needs to be serializable, as defined in {@link java.io.Serializable}.
+ *
+ * @param <T> Type of the initial input and the returned element
+ * @param <O> Type of the elements that the group/list/stream contains
+ */
+public interface FoldFunction<T, O> extends Function, Serializable {
+	/**
+	 * The core method of FoldFunction, combining two values into one value of the same type.
+	 * The fold function is consecutively applied to all values of a group until only a single value remains.
+	 *
+	 * @param accumulator The initial value, and accumulator.
+	 * @param value The value from the group to "fold" into the accumulator.
+	 * @return The accumulator that is at the end of the "folding" the group.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	T fold(T accumulator, O value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c99cb69/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
new file mode 100644
index 0000000..5abee70
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link FoldFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <T> Type of the initial input and the returned element
+ * @param <O> Type of the elements that the group/list/stream contains
+ */
+public abstract class RichFoldFunction<T, O> extends AbstractRichFunction implements FoldFunction<T, O> {
+
+	private static final long serialVersionUID = 1L;
+
+	public abstract T fold(T accumulator, O value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c99cb69/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 5f52f98..c5e2e8f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.InvalidTypesException;
@@ -100,6 +101,16 @@ public class TypeExtractor {
 	{
 		return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType, functionName, allowMissing);
 	}
+
+	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<OUT, IN> foldInterface, TypeInformation<IN> inType)
+	{
+		return getFoldReturnTypes(foldInterface, inType, null, false);
+	}
+
+	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<OUT, IN> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
+	{
+		return getUnaryOperatorReturnType((Function) foldInterface, FoldFunction.class, false, false, inType, functionName, allowMissing);
+	}
 	
 	
 	public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, TypeInformation<IN> inType) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6c99cb69/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index f7b5f07..29d7aba 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -24,10 +24,12 @@ import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichFoldFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.io.OutputFormat;
@@ -62,6 +64,7 @@ import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
 import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.StreamFoldInvokable;
 import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
 import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
@@ -521,6 +524,24 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Applies a fold transformation on the data stream. The returned stream
+	 * contains all the intermediate values of the fold transformation. The
+	 * user can also extend the {@link RichFoldFunction} to gain access to
+	 * other features provided by the {@link org.apache.flink.api.common.functions.RichFunction}
+	 * interface
+	 *
+	 * @param folder
+	 *          The {@link FoldFunction} that will be called for every element
+	 *          of the input values.
+	 * @return The transformed DataStream
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> fold(FoldFunction<R, OUT> folder, R initialValue) {
+		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
+
+		return transform("Fold", outType, new StreamFoldInvokable<OUT, R>(clean(folder), initialValue));
+	}
+
+	/**
 	 * Applies a Filter transformation on a {@link DataStream}. The
 	 * transformation calls a {@link FilterFunction} for each element of the
 	 * DataStream and retains only those element for which the function returns

http://git-wip-us.apache.org/repos/asf/flink/blob/6c99cb69/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
new file mode 100644
index 0000000..36b75f2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
+
+public class StreamFoldInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	protected FoldFunction<OUT, IN> folder;
+	protected OUT accumulator;
+	protected IN nextValue;
+
+	public StreamFoldInvokable(FoldFunction<OUT, IN> folder, OUT initialValue) {
+		super(folder);
+		this.folder = folder;
+		this.accumulator = initialValue;
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		while (isRunning && readNext() != null) {
+			fold();
+		}
+	}
+
+	protected void fold() throws Exception {
+		callUserFunctionAndLogException();
+
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+
+		nextValue = nextObject;
+		accumulator = folder.fold(accumulator, copy(nextValue));
+		collector.collect(accumulator);
+
+	}
+
+	@Override
+	public void collect(IN record) {
+		if (isRunning) {
+			nextObject = copy(record);
+			callUserFunctionAndLogException();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c99cb69/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
new file mode 100644
index 0000000..b07191c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class StreamFoldTest {
+
+	private static class MyFolder implements FoldFunction<String, Integer>{
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String fold(String accumulator, Integer value) throws Exception {
+			return accumulator + value.toString();
+		}
+	}
+
+	@Test
+	public void test() {
+		StreamFoldInvokable<Integer, String> invokable1 = new StreamFoldInvokable<Integer, String>(
+				new MyFolder(), "");
+
+		List<String> expected = Arrays.asList("1","11","112","1123","11233");
+		List<String> actual = MockContext.createAndExecute(invokable1,
+				Arrays.asList(1, 1, 2, 3, 3));
+
+		assertEquals(expected, actual);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c99cb69/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 15467bb..0b2b2b9 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -25,14 +25,13 @@ import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.streaming.api.invokable.operator.MapInvokable
+import org.apache.flink.streaming.api.invokable.operator._
 import org.apache.flink.util.Collector
 import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.streaming.api.invokable.StreamInvokable
-import org.apache.flink.streaming.api.invokable.operator.{ GroupedReduceInvokable, StreamReduceInvokable }
 import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.functions.FoldFunction
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.common.functions.FilterFunction
 import org.apache.flink.streaming.api.function.sink.SinkFunction
@@ -425,6 +424,19 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
+   * Creates a new [[DataStream]] by folding the elements of this DataStream
+   * using an associative reduce function and an initial value.
+   */
+  def fold[R: TypeInformation: ClassTag](folder: FoldFunction[R,T], initialValue: R): DataStream[R] = {
+    if (folder == null) {
+      throw new NullPointerException("Fold function must not be null.")
+    }
+    javaStream.transform("fold", implicitly[TypeInformation[R]],
+        new StreamFoldInvokable[T,R](folder, initialValue))
+  }
+
+
+  /**
    * Creates a new [[DataStream]] by reducing the elements of this DataStream
    * using an associative reduce function.
    */


[2/4] flink git commit: [FLINK-1450] Added GroupFoldFunction and GroupedFoldInvokable with a test. Integrated them into DataStream

Posted by gy...@apache.org.
[FLINK-1450] Added GroupFoldFunction and GroupedFoldInvokable with a test. Integrated them into DataStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f785c73e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f785c73e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f785c73e

Branch: refs/heads/master
Commit: f785c73e81348c062578319588ce24194c0a2061
Parents: 6c99cb6
Author: Akshay Dixit <ak...@gmail.com>
Authored: Sat Mar 14 02:22:38 2015 +0530
Committer: Gyula Fora <gy...@apache.org>
Committed: Sat Mar 14 19:29:53 2015 +0100

----------------------------------------------------------------------
 .../api/common/functions/GroupFoldFunction.java | 56 +++++++++++++++++
 .../api/datastream/GroupedDataStream.java       | 29 +++++++++
 .../operator/GroupedFoldInvokable.java          | 61 +++++++++++++++++++
 .../invokable/operator/StreamFoldInvokable.java |  2 +-
 .../operator/GroupedFoldInvokableTest.java      | 63 ++++++++++++++++++++
 .../flink/streaming/api/scala/DataStream.scala  | 43 +++++++++----
 6 files changed, 242 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f785c73e/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupFoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupFoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupFoldFunction.java
new file mode 100644
index 0000000..fb59c89
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupFoldFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+import org.apache.flink.util.Collector;
+
+/**
+ * The interface for group fold functions. GroupFoldFunctions process groups of elements.
+ * They may aggregate them to a single value, or produce multiple result values for each group.
+ * The group may be defined by sharing a common grouping key, or the group may simply be
+ * all elements of a data set.
+ * <p>
+ * For a fold function that works incrementally by combining always two elements, see
+ * {@link FoldFunction}.
+ * <p>
+ * The basic syntax for using a grouped GroupFoldFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ *
+ * X initialValue = ...;
+ * DataSet<X> result = input.groupBy(<key-definition>).foldGroup(new MyGroupFoldFunction(), initialValue);
+ * </blockquote></pre>
+ *
+ * @param <T> Type of the initial input and the returned element
+ * @param <O> Type of the elements that the group/list/stream contains
+ */
+public interface GroupFoldFunction<T, O> extends Function, Serializable {
+	/**
+	 * The fold method. The function receives one call per group of elements.
+	 *
+	 * @param values All records that belong to the given input key.
+	 * @param out The collector to hand results to.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	void fold(Iterable<T> values, Collector<O> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f785c73e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index c871c20..50ae542 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -17,10 +17,14 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.invokable.operator.GroupedFoldInvokable;
 import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 
@@ -71,6 +75,31 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	}
 
 	/**
+	 * Applies a fold transformation on the grouped data stream grouped on by
+	 * the given key position. The {@link FoldFunction} will receive input
+	 * values based on the key value. Only input values with the same key will
+	 * go to the same folder.The user can also extend
+	 * {@link RichFoldFunction} to gain access to other features provided by
+	 * the {@link RichFuntion} interface.
+	 *
+	 * @param folder
+	 *            The {@link FoldFunction} that will be called for every
+	 *            element of the input values with the same key.
+	 * @param initialValue
+	 *            The initialValue passed to the folders for each key.
+	 * @return The transformed DataStream.
+	 */
+
+	@Override
+	public <R> SingleOutputStreamOperator<R, ?> fold(FoldFunction<R, OUT> folder, R initialValue) {
+
+		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
+
+		return transform("Grouped Fold", outType, new GroupedFoldInvokable<OUT, R>(clean(folder), keySelector,
+				initialValue));
+	}
+
+	/**
 	 * Applies an aggregation that sums the grouped data stream at the given
 	 * position, grouped by the given key position. Input values with the same
 	 * key will be summed.

http://git-wip-us.apache.org/repos/asf/flink/blob/f785c73e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
new file mode 100644
index 0000000..3263955
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+
+public class GroupedFoldInvokable<IN, OUT> extends StreamFoldInvokable<IN, OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private KeySelector<IN, ?> keySelector;
+	private Map<Object, OUT> values;
+	private OUT folded;
+	private OUT initialValue;
+
+	public GroupedFoldInvokable(FoldFunction<OUT, IN> folder, KeySelector<IN, ?> keySelector, OUT initialValue) {
+		super(folder, initialValue);
+		this.keySelector = keySelector;
+		this.initialValue = initialValue;
+		values = new HashMap<Object, OUT>();
+	}
+
+	@Override
+	protected void fold() throws Exception {
+		Object key = nextRecord.getKey(keySelector);
+		accumulator = values.get(key);
+		nextValue = nextObject;
+		if (accumulator != null) {
+			callUserFunctionAndLogException();
+			values.put(key, folded);
+			collector.collect(folded);
+		} else {
+			values.put(key, initialValue);
+			collector.collect(initialValue);
+		}
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		folded = folder.fold(accumulator, nextValue);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f785c73e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
index 36b75f2..10912db 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
@@ -49,7 +49,7 @@ public class StreamFoldInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 	protected void callUserFunction() throws Exception {
 
 		nextValue = nextObject;
-		accumulator = folder.fold(accumulator, copy(nextValue));
+		accumulator = folder.fold(accumulator, nextValue);
 		collector.collect(accumulator);
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f785c73e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
new file mode 100644
index 0000000..c8209f9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.util.MockContext;
+
+import org.junit.Test;
+
+public class GroupedFoldInvokableTest {
+
+	private static class MyFolder implements FoldFunction<String, Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String fold(String accumulator, Integer value) throws Exception {
+			return accumulator + value.toString();
+		}
+
+	}
+
+	@Test
+	public void test() {
+		GroupedFoldInvokable<Integer, String> invokable1 = new GroupedFoldInvokable<Integer, String>(
+				new MyFolder(), new KeySelector<Integer, String>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public String getKey(Integer value) throws Exception {
+				return value.toString();
+			}
+		}, "100");
+
+		List<String> expected = Arrays.asList("100", "1001", "100", "1002", "100");
+		List<String> actual = MockContext.createAndExecute(invokable1,
+				Arrays.asList(1, 1, 2, 2, 3));
+
+		assertEquals(expected, actual);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f785c73e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 0b2b2b9..1663c8a 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -424,31 +424,52 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
+   * Creates a new [[DataStream]] by reducing the elements of this DataStream
+   * using an associative reduce function.
+   */
+  def reduce(fun: (T, T) => T): DataStream[T] = {
+    if (fun == null) {
+      throw new NullPointerException("Reduce function must not be null.")
+    }
+    val reducer = new ReduceFunction[T] {
+      val cleanFun = clean(fun)
+      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+    }
+    reduce(reducer)
+  }
+
+  /**
    * Creates a new [[DataStream]] by folding the elements of this DataStream
-   * using an associative reduce function and an initial value.
+   * using an associative fold function and an initial value.
    */
-  def fold[R: TypeInformation: ClassTag](folder: FoldFunction[R,T], initialValue: R): DataStream[R] = {
+  def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[R,T]): DataStream[R] = {
     if (folder == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
-    javaStream.transform("fold", implicitly[TypeInformation[R]],
+    javaStream match {
+      case ds: GroupedDataStream[_] => javaStream.transform("fold",
+        implicitly[TypeInformation[R]], new GroupedFoldInvokable[T,R](folder, ds.getKeySelector(), initialValue))
+      case _ => javaStream.transform("fold", implicitly[TypeInformation[R]],
         new StreamFoldInvokable[T,R](folder, initialValue))
+    }
   }
 
-
   /**
-   * Creates a new [[DataStream]] by reducing the elements of this DataStream
-   * using an associative reduce function.
+   * Creates a new [[DataStream]] by folding the elements of this DataStream
+   * using an associative fold function and an initial value.
    */
-  def reduce(fun: (T, T) => T): DataStream[T] = {
+  def fold[R: TypeInformation: ClassTag](initialValue: R, fun: (R,T) => R): DataStream[R] = {
     if (fun == null) {
-      throw new NullPointerException("Reduce function must not be null.")
+      throw new NullPointerException("Fold function must not be null.")
     }
-    val reducer = new ReduceFunction[T] {
+    val folder = new FoldFunction[R,T] {
       val cleanFun = clean(fun)
-      def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
+
+      def fold(acc: R, v: T) = {
+        cleanFun(acc, v)
+      }
     }
-    reduce(reducer)
+    fold(initialValue, folder)
   }
 
   /**


[4/4] flink git commit: [FLINK-1450] Several minor stream fold fixes

Posted by gy...@apache.org.
[FLINK-1450] Several minor stream fold fixes

Closes #481


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aaa231bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aaa231bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aaa231bc

Branch: refs/heads/master
Commit: aaa231bc09dabd57828836ba50ab5e74b6975e5b
Parents: 5f8ba8e
Author: Gyula Fora <gy...@apache.org>
Authored: Sat Mar 14 22:54:19 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Sun Mar 15 10:50:40 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/api/common/functions/FoldFunction.java   |  2 +-
 .../flink/api/common/functions/RichFoldFunction.java      |  2 +-
 .../apache/flink/api/java/typeutils/TypeExtractor.java    |  4 ++--
 .../apache/flink/streaming/api/datastream/DataStream.java |  2 +-
 .../flink/streaming/api/datastream/DiscretizedStream.java |  2 +-
 .../flink/streaming/api/datastream/GroupedDataStream.java |  2 +-
 .../streaming/api/datastream/WindowedDataStream.java      |  8 ++++----
 .../api/invokable/operator/GroupedFoldInvokable.java      |  8 +++++---
 .../api/invokable/operator/StreamFoldInvokable.java       |  4 ++--
 .../api/invokable/operator/windowing/WindowFolder.java    |  8 ++++----
 .../api/invokable/operator/GroupedFoldInvokableTest.java  |  4 ++--
 .../streaming/api/invokable/operator/StreamFoldTest.java  |  2 +-
 .../invokable/operator/windowing/WindowFolderTest.java    |  2 +-
 .../org/apache/flink/streaming/api/scala/DataStream.scala |  6 +++---
 .../flink/streaming/api/scala/WindowedDataStream.scala    | 10 +++++-----
 15 files changed, 34 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
index 0228dc1..a9c5b2b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/FoldFunction.java
@@ -37,7 +37,7 @@ import java.io.Serializable;
  * @param <T> Type of the initial input and the returned element
  * @param <O> Type of the elements that the group/list/stream contains
  */
-public interface FoldFunction<T, O> extends Function, Serializable {
+public interface FoldFunction<O,T> extends Function, Serializable {
 	/**
 	 * The core method of FoldFunction, combining two values into one value of the same type.
 	 * The fold function is consecutively applied to all values of a group until only a single value remains.

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
index 5abee70..3663823 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFoldFunction.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.common.functions.RichFunction;
  * @param <T> Type of the initial input and the returned element
  * @param <O> Type of the elements that the group/list/stream contains
  */
-public abstract class RichFoldFunction<T, O> extends AbstractRichFunction implements FoldFunction<T, O> {
+public abstract class RichFoldFunction<O, T> extends AbstractRichFunction implements FoldFunction<O, T> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index c5e2e8f..fdfe941 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -102,12 +102,12 @@ public class TypeExtractor {
 		return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType, functionName, allowMissing);
 	}
 
-	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<OUT, IN> foldInterface, TypeInformation<IN> inType)
+	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType)
 	{
 		return getFoldReturnTypes(foldInterface, inType, null, false);
 	}
 
-	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<OUT, IN> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
+	public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
 	{
 		return getUnaryOperatorReturnType((Function) foldInterface, FoldFunction.class, false, false, inType, functionName, allowMissing);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 1ccc24d..53179d7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -535,7 +535,7 @@ public class DataStream<OUT> {
 	 *          of the input values.
 	 * @return The transformed DataStream
 	 */
-	public <R> SingleOutputStreamOperator<R, ?> fold(FoldFunction<R, OUT> folder, R initialValue) {
+	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<OUT, R> folder) {
 		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
 
 		return transform("Fold", outType, new StreamFoldInvokable<OUT, R>(clean(folder), initialValue, outType));

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 5c0be93..6526aa6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -110,7 +110,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 	}
 
 	@Override
-	public <R> DiscretizedStream<R> foldWindow(FoldFunction<R, OUT> foldFunction, R initialValue,
+	public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<OUT, R> foldFunction,
 			TypeInformation<R> outType) {
 
 		DiscretizedStream<R> out = partition(transformation).transform(

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index a57370d..e3c22da 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -91,7 +91,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 	 */
 
 	@Override
-	public <R> SingleOutputStreamOperator<R, ?> fold(FoldFunction<R, OUT> folder, R initialValue) {
+	public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<OUT, R> folder) {
 
 		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 7c93588..c80546f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -293,11 +293,11 @@ public class WindowedDataStream<OUT> {
 	 *            The output type of the operator
 	 * @return The transformed DataStream
 	 */
-	public <R> DiscretizedStream<R> foldWindow(FoldFunction<R, OUT> foldFunction, R initialValue,
+	public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<OUT, R> foldFunction,
 			TypeInformation<R> outType) {
 
 		return discretize(WindowTransformation.FOLDWINDOW.with(clean(foldFunction)),
-				new BasicWindowBuffer<OUT>()).foldWindow(foldFunction, initialValue, outType);
+				new BasicWindowBuffer<OUT>()).foldWindow(initialValue, foldFunction, outType);
 
 	}
 
@@ -313,11 +313,11 @@ public class WindowedDataStream<OUT> {
 	 *            Initial value given to foldFunction
 	 * @return The transformed DataStream
 	 */
-	public <R> DiscretizedStream<R> foldWindow(FoldFunction<R, OUT> foldFunction, R initialValue) {
+	public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<OUT, R> foldFunction) {
 
 		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(foldFunction),
 				getType());
-		return foldWindow(foldFunction, initialValue, outType);
+		return foldWindow(initialValue, foldFunction, outType);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
index 62876b0..d4b5f7e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
@@ -32,7 +32,8 @@ public class GroupedFoldInvokable<IN, OUT> extends StreamFoldInvokable<IN, OUT>
 	private OUT folded;
 	private OUT initialValue;
 
-	public GroupedFoldInvokable(FoldFunction<OUT, IN> folder, KeySelector<IN, ?> keySelector, OUT initialValue, TypeInformation<OUT> outTypeInformation) {
+	public GroupedFoldInvokable(FoldFunction<IN, OUT> folder, KeySelector<IN, ?> keySelector,
+			OUT initialValue, TypeInformation<OUT> outTypeInformation) {
 		super(folder, initialValue, outTypeInformation);
 		this.keySelector = keySelector;
 		this.initialValue = initialValue;
@@ -49,8 +50,9 @@ public class GroupedFoldInvokable<IN, OUT> extends StreamFoldInvokable<IN, OUT>
 			values.put(key, folded);
 			collector.collect(folded);
 		} else {
-			values.put(key, initialValue);
-			collector.collect(initialValue);
+			OUT first = folded = folder.fold(outTypeSerializer.copy(initialValue), nextValue);
+			values.put(key, first);
+			collector.collect(first);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
index 205afe6..07ed022 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
@@ -25,12 +25,12 @@ import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 public class StreamFoldInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	protected FoldFunction<OUT, IN> folder;
+	protected FoldFunction<IN, OUT> folder;
 	protected OUT accumulator;
 	protected IN nextValue;
 	protected TypeSerializer<OUT> outTypeSerializer;
 
-	public StreamFoldInvokable(FoldFunction<OUT, IN> folder, OUT initialValue, TypeInformation<OUT> outTypeInformation) {
+	public StreamFoldInvokable(FoldFunction<IN, OUT> folder, OUT initialValue, TypeInformation<OUT> outTypeInformation) {
 		super(folder);
 		this.folder = folder;
 		this.accumulator = initialValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
index ffee070..162cb3c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
@@ -30,9 +30,9 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
 public class WindowFolder<IN, OUT> extends MapInvokable<StreamWindow<IN>, StreamWindow<OUT>> {
 
 	private static final long serialVersionUID = 1L;
-	FoldFunction<OUT, IN> folder;
+	FoldFunction<IN, OUT> folder;
 
-	public WindowFolder(FoldFunction<OUT, IN> folder, OUT initialValue) {
+	public WindowFolder(FoldFunction<IN, OUT> folder, OUT initialValue) {
 		super(new WindowFoldFunction<IN, OUT>(folder, initialValue));
 		this.folder = folder;
 	}
@@ -42,9 +42,9 @@ public class WindowFolder<IN, OUT> extends MapInvokable<StreamWindow<IN>, Stream
 
 		private static final long serialVersionUID = 1L;
 		private OUT initialValue;
-		FoldFunction<OUT, IN> folder;
+		FoldFunction<IN, OUT> folder;
 
-		public WindowFoldFunction(FoldFunction<OUT, IN> folder, OUT initialValue) {
+		public WindowFoldFunction(FoldFunction<IN, OUT> folder, OUT initialValue) {
 			this.folder = folder;
 			this.initialValue = initialValue;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
index 176c324..01375d2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
@@ -32,7 +32,7 @@ import org.junit.Test;
 
 public class GroupedFoldInvokableTest {
 
-	private static class MyFolder implements FoldFunction<String, Integer> {
+	private static class MyFolder implements FoldFunction<Integer, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -58,7 +58,7 @@ public class GroupedFoldInvokableTest {
 			}
 		}, "100", outType);
 
-		List<String> expected = Arrays.asList("100", "1001", "100", "1002", "100");
+		List<String> expected = Arrays.asList("1001","10011", "1002", "10022", "1003");
 		List<String> actual = MockContext.createAndExecute(invokable1,
 				Arrays.asList(1, 1, 2, 2, 3));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
index 0604ab1..90a133b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
@@ -30,7 +30,7 @@ import org.junit.Test;
 
 public class StreamFoldTest {
 
-	private static class MyFolder implements FoldFunction<String, Integer> {
+	private static class MyFolder implements FoldFunction<Integer, String> {
 
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java
index 53a6bb8..ccc01e4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java
@@ -33,7 +33,7 @@ public class WindowFolderTest {
 	@Test
 	public void test() {
 		StreamInvokable<StreamWindow<Integer>, StreamWindow<String>> windowReducer = new WindowFolder<Integer,String>(
-				new FoldFunction<String,Integer>() {
+				new FoldFunction<Integer, String>() {
 
 					private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 18b7bf9..59b1906 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -442,7 +442,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Creates a new [[DataStream]] by folding the elements of this DataStream
    * using an associative fold function and an initial value.
    */
-  def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[R,T]): 
+  def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]): 
   DataStream[R] = {
     if (folder == null) {
       throw new NullPointerException("Fold function must not be null.")
@@ -460,11 +460,11 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Creates a new [[DataStream]] by folding the elements of this DataStream
    * using an associative fold function and an initial value.
    */
-  def fold[R: TypeInformation: ClassTag](initialValue: R, fun: (R,T) => R): DataStream[R] = {
+  def fold[R: TypeInformation: ClassTag](initialValue: R)(fun: (R,T) => R): DataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
-    val folder = new FoldFunction[R,T] {
+    val folder = new FoldFunction[T,R] {
       val cleanFun = clean(fun)
 
       def fold(acc: R, v: T) = {

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa231bc/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
index 910f1f9..3bd5b24 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -138,12 +138,12 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    * the current window at every trigger.
    *
    */
-  def foldWindow[R: TypeInformation: ClassTag](folder: FoldFunction[R,T], initialValue: R): 
+  def foldWindow[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]): 
   WindowedDataStream[R] = {
     if (folder == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
-    javaStream.foldWindow(folder, initialValue, implicitly[TypeInformation[R]])
+    javaStream.foldWindow(initialValue, folder, implicitly[TypeInformation[R]])
   }
 
   /**
@@ -151,16 +151,16 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
    * the current window at every trigger.
    *
    */
-  def foldWindow[R: TypeInformation: ClassTag](initialValue: R, fun: (R, T) => R): 
+  def foldWindow[R: TypeInformation: ClassTag](initialValue: R)(fun: (R, T) => R): 
   WindowedDataStream[R] = {
     if (fun == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
-    val folder = new FoldFunction[R,T] {
+    val folder = new FoldFunction[T,R] {
       val cleanFun = clean(fun)
       def fold(acc: R, v: T) = { cleanFun(acc, v) }
     }
-    foldWindow(folder, initialValue)
+    foldWindow(initialValue, folder)
   }
 
   /**


[3/4] flink git commit: [FLINK-1450] Integrated fold into WindowedDataStream and added a WindowFolder test

Posted by gy...@apache.org.
[FLINK-1450] Integrated fold into WindowedDataStream and added a WindowFolder test

[FLINK-1450] Fixed StreamFoldInvokable and GroupedStreamFoldInvokable by implementing accumulator copying to prevent mutations and removed GroupFoldFunction.

Conflicts:
	flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala

[FLINK-1450] Fixed foldWindow on WindowedDataStreams so it doesn't fail on extracting type information from scala types

[FLINK-1450] Fix foldWindow so now user doesn't have to supply output TypeInformation while using it from Java API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f8ba8ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f8ba8ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f8ba8ee

Branch: refs/heads/master
Commit: 5f8ba8ee8299e13d8732f43d0294204996664560
Parents: f785c73
Author: Akshay Dixit <ak...@gmail.com>
Authored: Sat Mar 14 04:44:45 2015 +0530
Committer: Gyula Fora <gy...@apache.org>
Committed: Sat Mar 14 20:57:13 2015 +0100

----------------------------------------------------------------------
 .../api/common/functions/GroupFoldFunction.java | 56 ----------------
 .../streaming/api/datastream/DataStream.java    |  2 +-
 .../api/datastream/DiscretizedStream.java       | 20 +++++-
 .../api/datastream/GroupedDataStream.java       |  2 +-
 .../api/datastream/WindowedDataStream.java      | 57 ++++++++++++++--
 .../operator/GroupedFoldInvokable.java          |  7 +-
 .../invokable/operator/StreamFoldInvokable.java |  9 ++-
 .../operator/windowing/WindowFolder.java        | 69 ++++++++++++++++++++
 .../streaming/api/windowing/WindowUtils.java    |  2 +-
 .../operator/GroupedFoldInvokableTest.java      |  6 +-
 .../api/invokable/operator/StreamFoldTest.java  |  7 +-
 .../operator/windowing/WindowFolderTest.java    | 61 +++++++++++++++++
 .../flink/streaming/api/scala/DataStream.scala  | 10 +--
 .../api/scala/WindowedDataStream.scala          | 32 ++++++++-
 14 files changed, 259 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5f8ba8ee/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupFoldFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupFoldFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupFoldFunction.java
deleted file mode 100644
index fb59c89..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupFoldFunction.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.functions;
-
-import java.io.Serializable;
-
-import org.apache.flink.util.Collector;
-
-/**
- * The interface for group fold functions. GroupFoldFunctions process groups of elements.
- * They may aggregate them to a single value, or produce multiple result values for each group.
- * The group may be defined by sharing a common grouping key, or the group may simply be
- * all elements of a data set.
- * <p>
- * For a fold function that works incrementally by combining always two elements, see
- * {@link FoldFunction}.
- * <p>
- * The basic syntax for using a grouped GroupFoldFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- *
- * X initialValue = ...;
- * DataSet<X> result = input.groupBy(<key-definition>).foldGroup(new MyGroupFoldFunction(), initialValue);
- * </blockquote></pre>
- *
- * @param <T> Type of the initial input and the returned element
- * @param <O> Type of the elements that the group/list/stream contains
- */
-public interface GroupFoldFunction<T, O> extends Function, Serializable {
-	/**
-	 * The fold method. The function receives one call per group of elements.
-	 *
-	 * @param values All records that belong to the given input key.
-	 * @param out The collector to hand results to.
-	 *
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
-	void fold(Iterable<T> values, Collector<O> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f8ba8ee/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 29d7aba..1ccc24d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -538,7 +538,7 @@ public class DataStream<OUT> {
 	public <R> SingleOutputStreamOperator<R, ?> fold(FoldFunction<R, OUT> folder, R initialValue) {
 		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
 
-		return transform("Fold", outType, new StreamFoldInvokable<OUT, R>(clean(folder), initialValue));
+		return transform("Fold", outType, new StreamFoldInvokable<OUT, R>(clean(folder), initialValue, outType));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5f8ba8ee/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 451acf0..5c0be93 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -29,6 +30,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.function.WindowMapFunction;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFlattener;
+import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFolder;
 import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMapper;
 import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMerger;
 import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartitioner;
@@ -41,8 +43,9 @@ import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation
 /**
  * A {@link DiscretizedStream} represents a data stream that has been divided
  * into windows (predefined chunks). User defined function such as
- * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow()} or aggregations
- * can be applied to the windows.
+ * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow()},
+ * {@link #foldWindow(FoldFunction, initialValue)} or aggregations can be
+ * applied to the windows.
  * 
  * @param <OUT>
  *            The output type of the {@link DiscretizedStream}
@@ -106,6 +109,17 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 		return out;
 	}
 
+	@Override
+	public <R> DiscretizedStream<R> foldWindow(FoldFunction<R, OUT> foldFunction, R initialValue,
+			TypeInformation<R> outType) {
+
+		DiscretizedStream<R> out = partition(transformation).transform(
+				WindowTransformation.FOLDWINDOW, "Fold Window", outType,
+				new WindowFolder<OUT, R>(discretizedStream.clean(foldFunction), initialValue))
+				.merge();
+		return out;
+	}
+
 	private <R> DiscretizedStream<R> transform(WindowTransformation transformation,
 			String operatorName, TypeInformation<R> retType,
 			StreamInvokable<StreamWindow<OUT>, StreamWindow<R>> invokable) {
@@ -126,7 +140,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 			out.isPartitioned = true;
 
 			return out;
-		} else if (transformation != WindowTransformation.MAPWINDOW
+		} else if (transformation == WindowTransformation.REDUCEWINDOW
 				&& parallelism != discretizedStream.getExecutionEnvironment()
 						.getDegreeOfParallelism()) {
 			DiscretizedStream<OUT> out = transform(transformation, "Window partitioner", getType(),

http://git-wip-us.apache.org/repos/asf/flink/blob/5f8ba8ee/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 50ae542..a57370d 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -96,7 +96,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
 		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType());
 
 		return transform("Grouped Fold", outType, new GroupedFoldInvokable<OUT, R>(clean(folder), keySelector,
-				initialValue));
+				initialValue, outType));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5f8ba8ee/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 8199b22..7c93588 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -18,7 +18,9 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFoldFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -28,6 +30,7 @@ import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.function.RichWindowMapFunction;
 import org.apache.flink.streaming.api.function.WindowMapFunction;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
@@ -274,6 +277,50 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
+	 * Applies a fold transformation on the windowed data stream by folding the
+	 * current window at every trigger.The user can also extend the
+	 * {@link RichFoldFunction} to gain access to other features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 * This version of foldWindow uses user supplied typeinformation for
+	 * serializaton. Use this only when the system is unable to detect type
+	 * information.
+	 * 
+	 * @param foldFunction
+	 *            The fold function that will be applied to the windows.
+	 * @param initialValue
+	 *            Initial value given to foldFunction
+	 * @param outType
+	 *            The output type of the operator
+	 * @return The transformed DataStream
+	 */
+	public <R> DiscretizedStream<R> foldWindow(FoldFunction<R, OUT> foldFunction, R initialValue,
+			TypeInformation<R> outType) {
+
+		return discretize(WindowTransformation.FOLDWINDOW.with(clean(foldFunction)),
+				new BasicWindowBuffer<OUT>()).foldWindow(foldFunction, initialValue, outType);
+
+	}
+
+	/**
+	 * Applies a fold transformation on the windowed data stream by folding the
+	 * current window at every trigger.The user can also extend the
+	 * {@link RichFoldFunction} to gain access to other features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 * 
+	 * @param foldFunction
+	 *            The fold function that will be applied to the windows.
+	 * @param initialValue
+	 *            Initial value given to foldFunction
+	 * @return The transformed DataStream
+	 */
+	public <R> DiscretizedStream<R> foldWindow(FoldFunction<R, OUT> foldFunction, R initialValue) {
+
+		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(foldFunction),
+				getType());
+		return foldWindow(foldFunction, initialValue, outType);
+	}
+
+	/**
 	 * Applies a mapWindow transformation on the windowed data stream by calling
 	 * the mapWindow function on the window at every trigger. In contrast with
 	 * the standard binary reducer, with mapWindow allows the user to access all
@@ -327,7 +374,7 @@ public class WindowedDataStream<OUT> {
 		TypeInformation<WindowEvent<OUT>> bufferEventType = new TupleTypeInfo(WindowEvent.class,
 				getType(), BasicTypeInfo.INT_TYPE_INFO);
 
-		int parallelism = getDiscretizerParallelism();
+		int parallelism = getDiscretizerParallelism(transformation);
 
 		return new DiscretizedStream<OUT>(dataStream
 				.transform("Stream Discretizer", bufferEventType, discretizer)
@@ -338,11 +385,11 @@ public class WindowedDataStream<OUT> {
 
 	}
 
-	private int getDiscretizerParallelism() {
+	private int getDiscretizerParallelism(WindowTransformation transformation) {
 		return isLocal
-				|| WindowUtils.isParallelPolicy(getTrigger(), getEviction(),
-						dataStream.getParallelism()) || (discretizerKey != null) ? dataStream.environment
-				.getDegreeOfParallelism() : 1;
+				|| (transformation == WindowTransformation.REDUCEWINDOW && WindowUtils
+						.isParallelPolicy(getTrigger(), getEviction(), dataStream.getParallelism()))
+				|| (discretizerKey != null) ? dataStream.environment.getDegreeOfParallelism() : 1;
 	}
 
 	private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer() {

http://git-wip-us.apache.org/repos/asf/flink/blob/5f8ba8ee/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
index 3263955..62876b0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokable.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 
 public class GroupedFoldInvokable<IN, OUT> extends StreamFoldInvokable<IN, OUT> {
@@ -31,8 +32,8 @@ public class GroupedFoldInvokable<IN, OUT> extends StreamFoldInvokable<IN, OUT>
 	private OUT folded;
 	private OUT initialValue;
 
-	public GroupedFoldInvokable(FoldFunction<OUT, IN> folder, KeySelector<IN, ?> keySelector, OUT initialValue) {
-		super(folder, initialValue);
+	public GroupedFoldInvokable(FoldFunction<OUT, IN> folder, KeySelector<IN, ?> keySelector, OUT initialValue, TypeInformation<OUT> outTypeInformation) {
+		super(folder, initialValue, outTypeInformation);
 		this.keySelector = keySelector;
 		this.initialValue = initialValue;
 		values = new HashMap<Object, OUT>();
@@ -55,7 +56,7 @@ public class GroupedFoldInvokable<IN, OUT> extends StreamFoldInvokable<IN, OUT>
 
 	@Override
 	protected void callUserFunction() throws Exception {
-		folded = folder.fold(accumulator, nextValue);
+		folded = folder.fold(outTypeSerializer.copy(accumulator), nextValue);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5f8ba8ee/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
index 10912db..205afe6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldInvokable.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 
 public class StreamFoldInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
@@ -26,11 +28,13 @@ public class StreamFoldInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 	protected FoldFunction<OUT, IN> folder;
 	protected OUT accumulator;
 	protected IN nextValue;
+	protected TypeSerializer<OUT> outTypeSerializer;
 
-	public StreamFoldInvokable(FoldFunction<OUT, IN> folder, OUT initialValue) {
+	public StreamFoldInvokable(FoldFunction<OUT, IN> folder, OUT initialValue, TypeInformation<OUT> outTypeInformation) {
 		super(folder);
 		this.folder = folder;
 		this.accumulator = initialValue;
+		this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
 	}
 
 	@Override
@@ -49,7 +53,7 @@ public class StreamFoldInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 	protected void callUserFunction() throws Exception {
 
 		nextValue = nextObject;
-		accumulator = folder.fold(accumulator, nextValue);
+		accumulator = folder.fold(outTypeSerializer.copy(accumulator), nextValue);
 		collector.collect(accumulator);
 
 	}
@@ -61,5 +65,4 @@ public class StreamFoldInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 			callUserFunctionAndLogException();
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5f8ba8ee/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
new file mode 100644
index 0000000..ffee070
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.streaming.api.datastream.WindowedDataStream;
+import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+
+/**
+ * This invokable is used to apply foldWindow transformations on
+ * {@link WindowedDataStream}s.
+ */
+public class WindowFolder<IN, OUT> extends MapInvokable<StreamWindow<IN>, StreamWindow<OUT>> {
+
+	private static final long serialVersionUID = 1L;
+	FoldFunction<OUT, IN> folder;
+
+	public WindowFolder(FoldFunction<OUT, IN> folder, OUT initialValue) {
+		super(new WindowFoldFunction<IN, OUT>(folder, initialValue));
+		this.folder = folder;
+	}
+
+	private static class WindowFoldFunction<IN, OUT> implements
+			MapFunction<StreamWindow<IN>, StreamWindow<OUT>> {
+
+		private static final long serialVersionUID = 1L;
+		private OUT initialValue;
+		FoldFunction<OUT, IN> folder;
+
+		public WindowFoldFunction(FoldFunction<OUT, IN> folder, OUT initialValue) {
+			this.folder = folder;
+			this.initialValue = initialValue;
+		}
+
+		@Override
+		public StreamWindow<OUT> map(StreamWindow<IN> window) throws Exception {
+			StreamWindow<OUT> outputWindow = new StreamWindow<OUT>(window.windowID);
+			outputWindow.numberOfParts = window.numberOfParts;
+
+			if (!window.isEmpty()) {
+				OUT accumulator = initialValue;
+				for (int i = 0; i < window.size(); i++) {
+					accumulator = folder.fold(accumulator, window.get(i));
+				}
+				outputWindow.add(accumulator);
+			}
+			return outputWindow;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f8ba8ee/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
index 0649b4d..8411d31 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
 public class WindowUtils {
 
 	public enum WindowTransformation {
-		REDUCEWINDOW, MAPWINDOW, NONE;
+		REDUCEWINDOW, MAPWINDOW, FOLDWINDOW, NONE;
 		private Function UDF;
 
 		public WindowTransformation with(Function UDF) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5f8ba8ee/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
index c8209f9..176c324 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
@@ -23,7 +23,9 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.util.MockContext;
 
 import org.junit.Test;
@@ -43,6 +45,8 @@ public class GroupedFoldInvokableTest {
 
 	@Test
 	public void test() {
+		TypeInformation<String> outType = TypeExtractor.getForObject("A string");
+
 		GroupedFoldInvokable<Integer, String> invokable1 = new GroupedFoldInvokable<Integer, String>(
 				new MyFolder(), new KeySelector<Integer, String>() {
 
@@ -52,7 +56,7 @@ public class GroupedFoldInvokableTest {
 			public String getKey(Integer value) throws Exception {
 				return value.toString();
 			}
-		}, "100");
+		}, "100", outType);
 
 		List<String> expected = Arrays.asList("100", "1001", "100", "1002", "100");
 		List<String> actual = MockContext.createAndExecute(invokable1,

http://git-wip-us.apache.org/repos/asf/flink/blob/5f8ba8ee/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
index b07191c..0604ab1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
@@ -23,12 +23,14 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.util.MockContext;
 import org.junit.Test;
 
 public class StreamFoldTest {
 
-	private static class MyFolder implements FoldFunction<String, Integer>{
+	private static class MyFolder implements FoldFunction<String, Integer> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -40,8 +42,9 @@ public class StreamFoldTest {
 
 	@Test
 	public void test() {
+		TypeInformation<String> outType = TypeExtractor.getForObject("A string");
 		StreamFoldInvokable<Integer, String> invokable1 = new StreamFoldInvokable<Integer, String>(
-				new MyFolder(), "");
+				new MyFolder(), "", outType);
 
 		List<String> expected = Arrays.asList("1","11","112","1123","11233");
 		List<String> actual = MockContext.createAndExecute(invokable1,

http://git-wip-us.apache.org/repos/asf/flink/blob/5f8ba8ee/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java
new file mode 100644
index 0000000..53a6bb8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFolderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class WindowFolderTest {
+
+	@Test
+	public void test() {
+		StreamInvokable<StreamWindow<Integer>, StreamWindow<String>> windowReducer = new WindowFolder<Integer,String>(
+				new FoldFunction<String,Integer>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public String fold(String accumulator, Integer value) throws Exception {
+						return accumulator + value.toString();
+					}
+				}, "");
+
+		List<StreamWindow<Integer>> input = new ArrayList<StreamWindow<Integer>>();
+		input.add(StreamWindow.fromElements(1, 2, 3));
+		input.add(new StreamWindow<Integer>());
+		input.add(StreamWindow.fromElements(-1));
+
+		List<StreamWindow<String>> expected = new ArrayList<StreamWindow<String>>();
+		expected.add(StreamWindow.fromElements("123"));
+		expected.add(new StreamWindow<String>());
+		expected.add(StreamWindow.fromElements("-1"));
+
+		List<StreamWindow<String>> output = MockContext.createAndExecute(windowReducer, input);
+
+		assertEquals(expected, output);
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f8ba8ee/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 1663c8a..18b7bf9 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -424,7 +424,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
-   * Creates a new [[DataStream]] by reducing the elements of this DataStream
+  * Creates a new [[DataStream]] by reducing the elements of this DataStream
    * using an associative reduce function.
    */
   def reduce(fun: (T, T) => T): DataStream[T] = {
@@ -442,15 +442,17 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * Creates a new [[DataStream]] by folding the elements of this DataStream
    * using an associative fold function and an initial value.
    */
-  def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[R,T]): DataStream[R] = {
+  def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[R,T]): 
+  DataStream[R] = {
     if (folder == null) {
       throw new NullPointerException("Fold function must not be null.")
     }
     javaStream match {
       case ds: GroupedDataStream[_] => javaStream.transform("fold",
-        implicitly[TypeInformation[R]], new GroupedFoldInvokable[T,R](folder, ds.getKeySelector(), initialValue))
+        implicitly[TypeInformation[R]], new GroupedFoldInvokable[T,R](folder, ds.getKeySelector(), 
+            initialValue, implicitly[TypeInformation[R]]))
       case _ => javaStream.transform("fold", implicitly[TypeInformation[R]],
-        new StreamFoldInvokable[T,R](folder, initialValue))
+        new StreamFoldInvokable[T,R](folder, initialValue, implicitly[TypeInformation[R]]))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5f8ba8ee/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
index b072197..910f1f9 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -22,7 +22,7 @@ import scala.Array.canBuildFrom
 import scala.collection.JavaConversions.iterableAsScalaIterable
 import scala.reflect.ClassTag
 
-import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
@@ -134,6 +134,36 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
   }
 
   /**
+   * Applies a fold transformation on the windowed data stream by reducing
+   * the current window at every trigger.
+   *
+   */
+  def foldWindow[R: TypeInformation: ClassTag](folder: FoldFunction[R,T], initialValue: R): 
+  WindowedDataStream[R] = {
+    if (folder == null) {
+      throw new NullPointerException("Fold function must not be null.")
+    }
+    javaStream.foldWindow(folder, initialValue, implicitly[TypeInformation[R]])
+  }
+
+  /**
+   * Applies a fold transformation on the windowed data stream by reducing
+   * the current window at every trigger.
+   *
+   */
+  def foldWindow[R: TypeInformation: ClassTag](initialValue: R, fun: (R, T) => R): 
+  WindowedDataStream[R] = {
+    if (fun == null) {
+      throw new NullPointerException("Fold function must not be null.")
+    }
+    val folder = new FoldFunction[R,T] {
+      val cleanFun = clean(fun)
+      def fold(acc: R, v: T) = { cleanFun(acc, v) }
+    }
+    foldWindow(folder, initialValue)
+  }
+
+  /**
    * Applies a mapWindow transformation on the windowed data stream by calling the mapWindow
    * method on current window at every trigger. In contrast with the simple binary reduce 
    * operator, mapWindow exposes the whole window through the Iterable interface.