You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/22 22:12:16 UTC

[9/9] flink git commit: [FLINK-5582] [streaming] Add 'AggregateFunction' and 'AggregatingState'.

[FLINK-5582] [streaming] Add 'AggregateFunction' and 'AggregatingState'.

The AggregateFunction implements a very flexible interface for distributive aggregations.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09380e49
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09380e49
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09380e49

Branch: refs/heads/master
Commit: 09380e49256bff924734b9a932808e0f4daa7e5c
Parents: 2b3fd39
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 10 19:24:49 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jan 22 21:53:40 2017 +0100

----------------------------------------------------------------------
 .../state/RocksDBAggregatingState.java          | 205 ++++++++++++++
 .../state/RocksDBKeyedStateBackend.java         |  11 +
 .../state/RocksDBAggregatingStateTest.java      | 248 +++++++++++++++++
 .../state/RocksDBStateBackendConfigTest.java    |   4 +-
 .../api/common/functions/AggregateFunction.java |  94 +++++++
 .../api/common/state/AggregatingState.java      |  45 +++
 .../state/AggregatingStateDescriptor.java       | 145 ++++++++++
 .../flink/api/common/state/StateBackend.java    |  13 +-
 .../flink/api/common/state/StateDescriptor.java |  10 +-
 .../flink/api/java/typeutils/TypeExtractor.java |  23 ++
 .../apache/flink/core/fs/CloseableRegistry.java |   2 +
 .../memory/ByteArrayInputStreamWithPos.java     |   2 +
 .../state/AbstractKeyedStateBackend.java        |  23 ++
 .../state/heap/HeapAggregatingState.java        | 147 ++++++++++
 .../state/heap/HeapKeyedStateBackend.java       |  13 +
 .../runtime/state/heap/HeapReducingState.java   |   2 +-
 .../internal/InternalAggregatingState.java      |  33 +++
 .../state/internal/InternalReducingState.java   |   2 +-
 .../runtime/state/SerializationProxiesTest.java |   3 +-
 .../state/heap/HeapAggregatingStateTest.java    | 274 +++++++++++++++++++
 .../api/datastream/WindowedStream.java          | 177 +++++++++++-
 .../AggregateApplyAllWindowFunction.java        |  56 ++++
 .../windowing/AggregateApplyWindowFunction.java |  52 ++++
 23 files changed, 1575 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
new file mode 100644
index 0000000..1f306b4
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * An {@link AggregatingState} implementation that stores state in RocksDB.
+ *
+ * @param <K> The type of the key
+ * @param <N> The type of the namespace
+ * @param <T> The type of the values that aggregated into the state
+ * @param <ACC> The type of the value stored in the state (the accumulator type)
+ * @param <R> The type of the value returned from the state
+ */
+public class RocksDBAggregatingState<K, N, T, ACC, R>
+	extends AbstractRocksDBState<K, N, AggregatingState<T, R>, AggregatingStateDescriptor<T, ACC, R>, ACC>
+	implements InternalAggregatingState<N, T, R> {
+
+	/** Serializer for the values */
+	private final TypeSerializer<ACC> valueSerializer;
+
+	/** User-specified aggregation function */
+	private final AggregateFunction<T, ACC, R> aggFunction;
+
+	/**
+	 * We disable writes to the write-ahead-log here. We can't have these in the base class
+	 * because JNI segfaults for some reason if they are.
+	 */
+	private final WriteOptions writeOptions;
+
+	/**
+	 * Creates a new {@code RocksDBFoldingState}.
+	 *
+	 * @param namespaceSerializer
+	 *             The serializer for the namespace.
+	 * @param stateDesc              
+	 *             The state identifier for the state. This contains the state name and aggregation function.
+	 */
+	public RocksDBAggregatingState(
+			ColumnFamilyHandle columnFamily,
+			TypeSerializer<N> namespaceSerializer,
+			AggregatingStateDescriptor<T, ACC, R> stateDesc,
+			RocksDBKeyedStateBackend<K> backend) {
+
+		super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+		this.valueSerializer = stateDesc.getSerializer();
+		this.aggFunction = stateDesc.getAggregateFunction();
+
+		writeOptions = new WriteOptions();
+		writeOptions.setDisableWAL(true);
+	}
+
+	@Override
+	public R get() throws IOException {
+		try {
+			// prepare the current key and namespace for RocksDB lookup
+			writeCurrentKeyWithGroupAndNamespace();
+			final byte[] key = keySerializationStream.toByteArray();
+
+			// get the current value
+			final byte[] valueBytes = backend.db.get(columnFamily, key);
+
+			if (valueBytes == null) {
+				return null;
+			}
+
+			ACC accumulator = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+			return aggFunction.getResult(accumulator);
+		}
+		catch (IOException | RocksDBException e) {
+			throw new IOException("Error while retrieving value from RocksDB", e);
+		}
+	}
+
+	@Override
+	public void add(T value) throws IOException {
+		try {
+			// prepare the current key and namespace for RocksDB lookup
+			writeCurrentKeyWithGroupAndNamespace();
+			final byte[] key = keySerializationStream.toByteArray();
+			keySerializationStream.reset();
+
+			// get the current value
+			final byte[] valueBytes = backend.db.get(columnFamily, key);
+
+			// deserialize the current accumulator, or create a blank one
+			final ACC accumulator = valueBytes == null ?
+					aggFunction.createAccumulator() :
+					valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+
+			// aggregate the value into the accumulator
+			aggFunction.add(value, accumulator);
+
+			// serialize the new accumulator
+			final DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
+			valueSerializer.serialize(accumulator, out);
+
+			// write the new value to RocksDB
+			backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
+		}
+		catch (IOException | RocksDBException e) {
+			throw new IOException("Error while adding value to RocksDB", e);
+		}
+	}
+
+	@Override
+	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+		if (sources == null || sources.isEmpty()) {
+			return;
+		}
+
+		// cache key and namespace
+		final K key = backend.getCurrentKey();
+		final int keyGroup = backend.getCurrentKeyGroupIndex();
+
+		try {
+			ACC current = null;
+
+			// merge the sources to the target
+			for (N source : sources) {
+				if (source != null) {
+					writeKeyWithGroupAndNamespace(
+							keyGroup, key, source,
+							keySerializationStream, keySerializationDataOutputView);
+					
+					final byte[] sourceKey = keySerializationStream.toByteArray();
+					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
+
+					if (valueBytes != null) {
+						ACC value = valueSerializer.deserialize(
+								new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+
+						if (current != null) {
+							current = aggFunction.merge(current, value);
+						}
+						else {
+							current = value;
+						}
+					}
+				}
+			}
+
+			// if something came out of merging the sources, merge it or write it to the target
+			if (current != null) {
+				// create the target full-binary-key 
+				writeKeyWithGroupAndNamespace(
+						keyGroup, key, target,
+						keySerializationStream, keySerializationDataOutputView);
+
+				final byte[] targetKey = keySerializationStream.toByteArray();
+				final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
+
+				if (targetValueBytes != null) {
+					// target also had a value, merge
+					ACC value = valueSerializer.deserialize(
+							new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
+
+					current = aggFunction.merge(current, value);
+				}
+
+				// serialize the resulting value
+				keySerializationStream.reset();
+				valueSerializer.serialize(current, keySerializationDataOutputView);
+
+				// write the resulting value
+				backend.db.put(columnFamily, writeOptions, targetKey, keySerializationStream.toByteArray());
+			}
+		}
+		catch (Exception e) {
+			throw new Exception("Error while merging state in RocksDB", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 5a6d102..21ef8c2 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -18,6 +18,7 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -48,6 +49,7 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
@@ -842,6 +844,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
+	protected <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
+			TypeSerializer<N> namespaceSerializer,
+			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
+
+		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
+		return new RocksDBAggregatingState<>(columnFamily, namespaceSerializer, stateDesc, this);
+	}
+
+	@Override
 	protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
 			TypeSerializer<N> namespaceSerializer,
 			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
new file mode 100644
index 0000000..983e569
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static java.util.Arrays.asList;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the {@link InternalAggregatingState} implementation on top of RocksDB.
+ */
+public class RocksDBAggregatingStateTest {
+
+	@Rule
+	public final TemporaryFolder tmp = new TemporaryFolder();
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testAddAndGet() throws Exception {
+
+		final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
+				new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
+		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
+		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
+
+		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
+
+		try {
+			InternalAggregatingState<VoidNamespace, Long, Long> state =
+					keyedBackend.createAggregatingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
+			state.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+			state.add(17L);
+			state.add(11L);
+			assertEquals(28L, state.get().longValue());
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertNull(state.get());
+			state.add(1L);
+			state.add(2L);
+
+			keyedBackend.setCurrentKey("def");
+			assertEquals(28L, state.get().longValue());
+			state.clear();
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			state.add(3L);
+			state.add(2L);
+			state.add(1L);
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertEquals(9L, state.get().longValue());
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testMerging() throws Exception {
+
+		final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
+				new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
+		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		final TimeWindow win1 = new TimeWindow(1000, 2000);
+		final TimeWindow win2 = new TimeWindow(2000, 3000);
+		final TimeWindow win3 = new TimeWindow(3000, 4000);
+
+		final Long expectedResult = 165L;
+
+		final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
+		backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
+
+		final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
+
+		try {
+			InternalAggregatingState<TimeWindow, Long, Long> state =
+					keyedBackend.createAggregatingState(new TimeWindow.Serializer(), stateDescr);
+
+			// populate the different namespaces
+			//  - abc spreads the values over three namespaces
+			//  - def spreads teh values over two namespaces (one empty)
+			//  - ghi is empty
+			//  - jkl has all elements already in the target namespace
+			//  - mno has all elements already in one source namespace
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(win1);
+			state.add(33L);
+			state.add(55L);
+
+			state.setCurrentNamespace(win2);
+			state.add(22L);
+			state.add(11L);
+
+			state.setCurrentNamespace(win3);
+			state.add(44L);
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(win1);
+			state.add(11L);
+			state.add(44L);
+
+			state.setCurrentNamespace(win3);
+			state.add(22L);
+			state.add(55L);
+			state.add(33L);
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(win1);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(win3);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("abc");
+			state.mergeNamespaces(win1, asList(win2, win3));
+			state.setCurrentNamespace(win1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("def");
+			state.mergeNamespaces(win1, asList(win2, win3));
+			state.setCurrentNamespace(win1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("ghi");
+			state.mergeNamespaces(win1, asList(win2, win3));
+			state.setCurrentNamespace(win1);
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("jkl");
+			state.mergeNamespaces(win1, asList(win2, win3));
+			state.setCurrentNamespace(win1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("mno");
+			state.mergeNamespaces(win1, asList(win2, win3));
+			state.setCurrentNamespace(win1);
+			assertEquals(expectedResult, state.get());
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	private static RocksDBKeyedStateBackend<String> createKeyedBackend(RocksDBStateBackend backend) throws Exception {
+		return (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend(
+						new DummyEnvironment("TestTask", 1, 0),
+						new JobID(),
+						"test-op",
+						StringSerializer.INSTANCE,
+						16,
+						new KeyGroupRange(2, 3),
+						mock(TaskKvStateRegistry.class));
+	}
+
+	//  test functions
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static class AddingFunction implements AggregateFunction<Long, MutableLong, Long> {
+
+		@Override
+		public MutableLong createAccumulator() {
+			return new MutableLong();
+		}
+
+		@Override
+		public void add(Long value, MutableLong accumulator) {
+			accumulator.value += value;
+		}
+
+		@Override
+		public Long getResult(MutableLong accumulator) {
+			return accumulator.value;
+		}
+
+		@Override
+		public MutableLong merge(MutableLong a, MutableLong b) {
+			a.value += b.value;
+			return a;
+		}
+	}
+
+	private static final class MutableLong {
+		long value;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index bc0777c..9524352 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -351,11 +351,11 @@ public class RocksDBStateBackendConfigTest {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private static Environment getMockEnvironment() {
+	static Environment getMockEnvironment() {
 		return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) });
 	}
 
-	private static Environment getMockEnvironment(File[] tempDirs) {
+	static Environment getMockEnvironment(File[] tempDirs) {
 		final String[] tempDirStrings = new String[tempDirs.length];
 		for (int i = 0; i < tempDirs.length; i++) {
 			tempDirStrings[i] = tempDirs[i].getAbsolutePath();

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
new file mode 100644
index 0000000..507be63
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ * 
+ * <p>Aggregation functions must be {@link Serializable} because they are sent around
+ * between distributed processes during distributed execution.
+ * 
+ * <p>An example how to use this interface is below:
+ * 
+ * <pre>{@code
+ * // the accumulator, which holds the state of the in-flight aggregate
+ * public class AverageAccumulator {
+ *     long count;
+ *     long sum;
+ * }
+ * 
+ * // implementation of an aggregation function for an 'average'
+ * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
+ * 
+ *     public AverageAccumulator createAccumulator() {
+ *         return new AverageAccumulator();
+ *     }
+ * 
+ *     public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
+ *         a.count += b.count;
+ *         a.sum += b.sum;
+ *         return a;
+ *     }
+ * 
+ *     public void add(Integer value, AverageAccumulator acc) {
+ *         acc.sum += value;
+ *         acc.count++;
+ *     }
+ * 
+ *     public Double getResult(AverageAccumulator acc) {
+ *         return acc.sum / (double) acc.count;
+ *     }
+ * }
+ * 
+ * // implementation of a weighted average
+ * // this reuses the same accumulator type as the aggregate function for 'average'
+ * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
+ *
+ *     public AverageAccumulator createAccumulator() {
+ *         return new AverageAccumulator();
+ *     }
+ *
+ *     public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
+ *         a.count += b.count;
+ *         a.sum += b.sum;
+ *         return a;
+ *     }
+ *
+ *     public void add(Datum value, AverageAccumulator acc) {
+ *         acc.count += value.getWeight();
+ *         acc.sum += value.getValue();
+ *     }
+ *
+ *     public Double getResult(AverageAccumulator acc) {
+ *         return acc.sum / (double) acc.count;
+ *     }
+ * }
+ * }</pre>
+ */
+public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
+
+	ACC createAccumulator();
+
+	void add(IN value, ACC accumulator);
+
+	OUT getResult(ACC accumulator);
+
+	ACC merge(ACC a, ACC b);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
new file mode 100644
index 0000000..c679285
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AggregateFunction;
+
+/**
+ * {@link State} interface for aggregating state, based on an 
+ * {@link AggregateFunction}. Elements that are added to this type of state will
+ * be eagerly pre-aggregated using a given {@code AggregateFunction}.
+ * 
+ * <p>The state holds internally always the accumulator type of the {@code AggregateFunction}.
+ * When accessing the result of the state, the function's 
+ * {@link AggregateFunction#getResult(Object)} method.
+ *
+ * <p>The state is accessed and modified by user functions, and checkpointed consistently
+ * by the system as part of the distributed snapshots.
+ * 
+ * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * automatically supplied by the system, so the function always sees the value mapped to the
+ * key of the current element. That way, the system can handle stream and state partitioning
+ * consistently together.
+ * 
+ * @param <IN> Type of the value added to the state.
+ * @param <OUT> Type of the value extracted from the state.
+ */
+@PublicEvolving
+public interface AggregatingState<IN, OUT> extends MergingState<IN, OUT> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
new file mode 100644
index 0000000..abdac91
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A StateDescriptor for {@link AggregatingState}.
+ *
+ * <p>The type internally stored in the state is the type of the {@code Accumulator} of the
+ * {@code AggregateFunction}.
+ * 
+ * @param <IN> The type of the values that are added to the state.
+ * @param <ACC> The type of the accumulator (intermediate aggregation state).
+ * @param <OUT> The type of the values that are returned from the state.
+ */
+@PublicEvolving
+public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<AggregatingState<IN, OUT>, ACC> {
+	private static final long serialVersionUID = 1L;
+
+	/** The aggregation function for the state */
+	private final AggregateFunction<IN, ACC, OUT> aggFunction;
+
+	/**
+	 * Creates a new state descriptor with the given name, function, and type.
+	 *
+	 * <p>If this constructor fails (because it is not possible to describe the type via a class),
+	 * consider using the {@link #AggregatingStateDescriptor(String, AggregateFunction, TypeInformation)} constructor.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param aggFunction The {@code AggregateFunction} used to aggregate the state.   
+	 * @param stateType The type of the accumulator. The accumulator is stored in the state.
+	 */
+	public AggregatingStateDescriptor(
+			String name,
+			AggregateFunction<IN, ACC, OUT> aggFunction,
+			Class<ACC> stateType) {
+
+		super(name, stateType, null);
+		this.aggFunction = checkNotNull(aggFunction);
+	}
+
+	/**
+	 * Creates a new {@code ReducingStateDescriptor} with the given name and default value.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param aggFunction The {@code AggregateFunction} used to aggregate the state.
+	 * @param stateType The type of the accumulator. The accumulator is stored in the state.
+	 */
+	public AggregatingStateDescriptor(
+			String name,
+			AggregateFunction<IN, ACC, OUT> aggFunction,
+			TypeInformation<ACC> stateType) {
+
+		super(name, stateType, null);
+		this.aggFunction = checkNotNull(aggFunction);
+	}
+
+	/**
+	 * Creates a new {@code ValueStateDescriptor} with the given name and default value.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param aggFunction The {@code AggregateFunction} used to aggregate the state.
+	 * @param typeSerializer The serializer for the accumulator. The accumulator is stored in the state.
+	 */
+	public AggregatingStateDescriptor(
+			String name,
+			AggregateFunction<IN, ACC, OUT> aggFunction,
+			TypeSerializer<ACC> typeSerializer) {
+
+		super(name, typeSerializer, null);
+		this.aggFunction = checkNotNull(aggFunction);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public AggregatingState<IN, OUT> bind(StateBackend stateBackend) throws Exception {
+		return stateBackend.createAggregatingState(this);
+	}
+
+	/**
+	 * Returns the aggregate function to be used for the state.
+	 */
+	public AggregateFunction<IN, ACC, OUT> getAggregateFunction() {
+		return aggFunction;
+	}
+
+	@Override
+	public Type getType() {
+		return Type.AGGREGATING;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		else if (o != null && getClass() == o.getClass()) {
+			AggregatingStateDescriptor<?, ?, ?> that = (AggregatingStateDescriptor<?, ?, ?>) o;
+			return serializer.equals(that.serializer) && name.equals(that.name);
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		int result = serializer.hashCode();
+		result = 31 * result + name.hashCode();
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "AggregatingStateDescriptor{" +
+				"serializer=" + serializer +
+				", aggFunction=" + aggFunction +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
index d396a31..f9d1af7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
@@ -47,11 +47,22 @@ public interface StateBackend {
 	 * Creates and returns a new {@link ReducingState}.
 	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
 	 *
-	 * @param <T> The type of the values that the {@code ListState} can store.
+	 * @param <T> The type of the values that the {@code ReducingState} can store.
 	 */
 	<T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception;
 
 	/**
+	 * Creates and returns a new {@link AggregatingState}.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <IN> The type of the values that go into the aggregating state
+	 * @param <ACC> The type of the values that are stored in the aggregating state   
+	 * @param <OUT> The type of the values that come out of the aggregating state   
+	 */
+	<IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(
+			AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception;
+
+	/**
 	 * Creates and returns a new {@link FoldingState}.
 	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index ad9d417..b901d03 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -49,13 +49,19 @@ import static java.util.Objects.requireNonNull;
 @PublicEvolving
 public abstract class StateDescriptor<S extends State, T> implements Serializable {
 
-	// Do not change the order of the elements in this enum, ordinal is used in serialization
+	/**
+	 * An enumeration of the types of supported states. Used to identify the state type
+	 * when writing and restoring checkpoints and savepoints.
+	 */
+	// IMPORTANT: Do not change the order of the elements in this enum, ordinal is used in serialization
 	public enum Type {
-		@Deprecated UNKNOWN, VALUE, LIST, REDUCING, FOLDING
+		@Deprecated UNKNOWN, VALUE, LIST, REDUCING, FOLDING, AGGREGATING
 	}
 
 	private static final long serialVersionUID = 1L;
 
+	// ------------------------------------------------------------------------
+
 	/** Name that uniquely identifies state created from this StateDescriptor. */
 	protected final String name;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a2664f9..2b9eed9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.ClassUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -189,6 +190,28 @@ public class TypeExtractor {
 	}
 
 	@PublicEvolving
+	public static <IN, ACC> TypeInformation<ACC> getAggregateFunctionAccumulatorType(
+			AggregateFunction<IN, ACC, ?> function,
+			TypeInformation<IN> inType,
+			String functionName,
+			boolean allowMissing)
+	{
+		return getUnaryOperatorReturnType(
+			function, AggregateFunction.class, 0, 1, inType, functionName, allowMissing);
+	}
+
+	@PublicEvolving
+	public static <IN, OUT> TypeInformation<OUT> getAggregateFunctionReturnType(
+			AggregateFunction<IN, ?, OUT> function,
+			TypeInformation<IN> inType,
+			String functionName,
+			boolean allowMissing)
+	{
+		return getUnaryOperatorReturnType(
+				function, AggregateFunction.class, 0, 2, inType, functionName, allowMissing);
+	}
+
+	@PublicEvolving
 	public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, TypeInformation<IN> inType) {
 		return getMapPartitionReturnTypes(mapPartitionInterface, inType, null, false);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
index 81ba7ab..0d4ea0c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.core.fs;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.AbstractCloseableRegistry;
 
 import java.io.Closeable;
@@ -32,6 +33,7 @@ import java.util.Map;
  * <p>
  * All methods in this class are thread-safe.
  */
+@Internal
 public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Object> {
 
 	private static final Object DUMMY = new Object();

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
index 46b82c7..dd381a4 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.core.memory;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -26,6 +27,7 @@ import java.io.InputStream;
 /**
  * Un-synchronized stream similar to Java's ByteArrayInputStream that also exposes the current position.
  */
+@Internal
 public class ByteArrayInputStreamWithPos extends InputStream {
 
 	protected byte[] buffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index fcca77a..c8e0d0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
@@ -33,6 +35,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
@@ -159,6 +162,19 @@ public abstract class AbstractKeyedStateBackend<K>
 			ReducingStateDescriptor<T> stateDesc) throws Exception;
 
 	/**
+	 * Creates and returns a new {@link AggregatingState}.
+	 *
+	 * @param namespaceSerializer TypeSerializer for the state namespace.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <N> The type of the namespace.
+	 * @param <T> The type of the values that the {@code ListState} can store.
+	 */
+	protected abstract <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
+			TypeSerializer<N> namespaceSerializer,
+			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception;
+
+	/**
 	 * Creates and returns a new {@link FoldingState}.
 	 *
 	 * @param namespaceSerializer TypeSerializer for the state namespace.
@@ -265,6 +281,13 @@ public abstract class AbstractKeyedStateBackend<K>
 			}
 
 			@Override
+			public <T, ACC, R> AggregatingState<T, R> createAggregatingState(
+					AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
+				return AbstractKeyedStateBackend.this.createAggregatingState(namespaceSerializer, stateDesc);
+			}
+			
+
+			@Override
 			public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
 				return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
new file mode 100644
index 0000000..624b83e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Heap-backed partitioned {@link ReducingState} that is
+ * snapshotted into files.
+ * 
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <IN> The type of the value added to the state.
+ * @param <ACC> The type of the value stored in the state (the accumulator type).
+ * @param <OUT> The type of the value returned from the state.
+ */
+public class HeapAggregatingState<K, N, IN, ACC, OUT>
+		extends AbstractHeapMergingState<K, N, IN, OUT, ACC, AggregatingState<IN, OUT>, AggregatingStateDescriptor<IN, ACC, OUT>>
+		implements InternalAggregatingState<N, IN, OUT> {
+
+	private final AggregateFunction<IN, ACC, OUT> aggFunction;
+
+	/**
+	 * Creates a new key/value state for the given hash map of key/value pairs.
+	 *
+	 * @param backend
+	 *             The state backend backing that created this state.
+	 * @param stateDesc
+	 *             The state identifier for the state. This contains name and can create a default state value.
+	 * @param stateTable
+	 *             The state table to use in this kev/value state. May contain initial state.
+	 * @param namespaceSerializer
+	 *             The serializer for the type that indicates the namespace
+	 */
+	public HeapAggregatingState(
+			KeyedStateBackend<K> backend,
+			AggregatingStateDescriptor<IN, ACC, OUT> stateDesc,
+			StateTable<K, N, ACC> stateTable,
+			TypeSerializer<K> keySerializer,
+			TypeSerializer<N> namespaceSerializer) {
+
+		super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
+		this.aggFunction = stateDesc.getAggregateFunction();
+	}
+
+	// ------------------------------------------------------------------------
+	//  state access
+	// ------------------------------------------------------------------------
+
+	@Override
+	public OUT get() {
+		final K key = backend.getCurrentKey();
+
+		checkState(currentNamespace != null, "No namespace set.");
+		checkState(key != null, "No key set.");
+
+		Map<N, Map<K, ACC>> namespaceMap =
+				stateTable.get(backend.getCurrentKeyGroupIndex());
+
+		if (namespaceMap == null) {
+			return null;
+		}
+
+		Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
+
+		if (keyedMap == null) {
+			return null;
+		}
+
+		ACC accumulator = keyedMap.get(key);
+		return aggFunction.getResult(accumulator);
+	}
+
+	@Override
+	public void add(IN value) throws IOException {
+		final K key = backend.getCurrentKey();
+
+		checkState(currentNamespace != null, "No namespace set.");
+		checkState(key != null, "No key set.");
+
+		if (value == null) {
+			clear();
+			return;
+		}
+
+		Map<N, Map<K, ACC>> namespaceMap =
+				stateTable.get(backend.getCurrentKeyGroupIndex());
+
+		if (namespaceMap == null) {
+			namespaceMap = createNewMap();
+			stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
+		}
+
+		Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
+
+		if (keyedMap == null) {
+			keyedMap = createNewMap();
+			namespaceMap.put(currentNamespace, keyedMap);
+		}
+
+		// if this is the first value for the key, create a new accumulator
+		ACC accumulator = keyedMap.get(key);
+		if (accumulator == null) {
+			accumulator = aggFunction.createAccumulator();
+			keyedMap.put(key, accumulator);
+		}
+
+		// 
+		aggFunction.add(value, accumulator);
+	}
+
+	// ------------------------------------------------------------------------
+	//  state merging
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected ACC mergeState(ACC a, ACC b) throws Exception {
+		return aggFunction.merge(a, b);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index b05b874..b4c2b8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.commons.io.IOUtils;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -50,12 +52,14 @@ import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -169,6 +173,15 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
+	public <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
+			TypeSerializer<N> namespaceSerializer,
+			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
+
+		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
+		return new HeapAggregatingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+	}
+
+	@Override
 	protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
 			TypeSerializer<N> namespaceSerializer,
 			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
index 7804cb4..090a660 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
@@ -49,7 +49,7 @@ public class HeapReducingState<K, N, V>
 	 * @param backend The state backend backing that created this state.
 	 * @param stateDesc The state identifier for the state. This contains name
 	 *                           and can create a default state value.
-	 * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+	 * @param stateTable The state table to use in this kev/value state. May contain initial state.
 	 */
 	public HeapReducingState(
 			KeyedStateBackend<K> backend,

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
new file mode 100644
index 0000000..15a8e31
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.AggregatingState;
+
+/**
+ * The peer to the {@link AggregatingState} in the internal state type hierarchy.
+ * 
+ * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
+ * 
+ * @param <N>   The type of the namespace
+ * @param <IN>  Type of the value added to the state.
+ * @param <OUT> Type of the value extracted from the state.
+ */
+public interface InternalAggregatingState<N, IN, OUT> 
+		extends InternalMergingState<N, IN, OUT>, AggregatingState<IN, OUT> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
index 40e625c..76fa58f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
@@ -26,6 +26,6 @@ import org.apache.flink.api.common.state.ReducingState;
  * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
  * 
  * @param <N> The type of the namespace
- * @param <T> The type of elements in the list
+ * @param <T> The type of elements in the aggregated by the ReduceFunction
  */
 public interface InternalReducingState<N, T> extends InternalMergingState<N, T, T>, ReducingState<T> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 2448540..66e8d02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -161,12 +161,13 @@ public class SerializationProxiesTest {
 	@Test
 	public void testFixTypeOrder() {
 		// ensure all elements are covered
-		Assert.assertEquals(5, StateDescriptor.Type.values().length);
+		Assert.assertEquals(6, StateDescriptor.Type.values().length);
 		// fix the order of elements to keep serialization format stable
 		Assert.assertEquals(0, StateDescriptor.Type.UNKNOWN.ordinal());
 		Assert.assertEquals(1, StateDescriptor.Type.VALUE.ordinal());
 		Assert.assertEquals(2, StateDescriptor.Type.LIST.ordinal());
 		Assert.assertEquals(3, StateDescriptor.Type.REDUCING.ordinal());
 		Assert.assertEquals(4, StateDescriptor.Type.FOLDING.ordinal());
+		Assert.assertEquals(5, StateDescriptor.Type.AGGREGATING.ordinal());
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
new file mode 100644
index 0000000..a7ae5be
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the simple Java heap objects implementation of the {@link AggregatingState}.
+ */
+public class HeapAggregatingStateTest {
+
+	@Test
+	public void testAddAndGet() throws Exception {
+
+		final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
+				new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
+		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
+
+		try {
+			InternalAggregatingState<VoidNamespace, Long, Long> state =
+					keyedBackend.createAggregatingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
+			state.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+			state.add(17L);
+			state.add(11L);
+			assertEquals(28L, state.get().longValue());
+
+			keyedBackend.setCurrentKey("abc");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertNull(state.get());
+			state.add(1L);
+			state.add(2L);
+
+			keyedBackend.setCurrentKey("def");
+			assertEquals(28L, state.get().longValue());
+			state.clear();
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			state.add(3L);
+			state.add(2L);
+			state.add(1L);
+
+			keyedBackend.setCurrentKey("def");
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("g");
+			assertEquals(9L, state.get().longValue());
+			state.clear();
+
+			// make sure all lists / maps are cleared
+
+			StateTable<String, VoidNamespace, MutableLong> stateTable =
+					((HeapAggregatingState<String, VoidNamespace, Long, MutableLong, Long>) state).stateTable;
+
+			assertTrue(stateTable.isEmpty());
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	@Test
+	public void testMerging() throws Exception {
+
+		final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
+				new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
+		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		final Integer namespace1 = 1;
+		final Integer namespace2 = 2;
+		final Integer namespace3 = 3;
+
+		final Long expectedResult = 165L;
+
+		final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
+
+		try {
+			InternalAggregatingState<Integer, Long, Long> state =
+					keyedBackend.createAggregatingState(IntSerializer.INSTANCE, stateDescr);
+
+			// populate the different namespaces
+			//  - abc spreads the values over three namespaces
+			//  - def spreads teh values over two namespaces (one empty)
+			//  - ghi is empty
+			//  - jkl has all elements already in the target namespace
+			//  - mno has all elements already in one source namespace
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.add(33L);
+			state.add(55L);
+
+			state.setCurrentNamespace(namespace2);
+			state.add(22L);
+			state.add(11L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(44L);
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(44L);
+
+			state.setCurrentNamespace(namespace3);
+			state.add(22L);
+			state.add(55L);
+			state.add(33L);
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace3);
+			state.add(11L);
+			state.add(22L);
+			state.add(33L);
+			state.add(44L);
+			state.add(55L);
+
+			keyedBackend.setCurrentKey("abc");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("def");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("ghi");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertNull(state.get());
+
+			keyedBackend.setCurrentKey("jkl");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			keyedBackend.setCurrentKey("mno");
+			state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+			state.setCurrentNamespace(namespace1);
+			assertEquals(expectedResult, state.get());
+
+			// make sure all lists / maps are cleared
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("ghi");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace1);
+			state.clear();
+
+			StateTable<String, Integer, MutableLong> stateTable =
+					((HeapAggregatingState<String, Integer, Long, MutableLong, Long>) state).stateTable;
+
+			assertTrue(stateTable.isEmpty());
+		}
+		finally {
+			keyedBackend.close();
+			keyedBackend.dispose();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception {
+		return new HeapKeyedStateBackend<>(
+				mock(TaskKvStateRegistry.class),
+				StringSerializer.INSTANCE,
+				HeapAggregatingStateTest.class.getClassLoader(),
+				16,
+				new KeyGroupRange(0, 15));
+	}
+
+	// ------------------------------------------------------------------------
+	//  test functions
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static class AddingFunction implements AggregateFunction<Long, MutableLong, Long> {
+
+		@Override
+		public MutableLong createAccumulator() {
+			return new MutableLong();
+		}
+
+		@Override
+		public void add(Long value, MutableLong accumulator) {
+			accumulator.value += value;
+		}
+
+		@Override
+		public Long getResult(MutableLong accumulator) {
+			return accumulator.value;
+		}
+
+		@Override
+		public MutableLong merge(MutableLong a, MutableLong b) {
+			a.value += b.value;
+			return a;
+		}
+	}
+
+	private static final class MutableLong {
+		long value;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 704875b..30e64c4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -20,10 +20,12 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -36,6 +38,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
@@ -63,6 +66,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@code WindowedStream} represents a data stream where elements are grouped by
  * key, and for each key, the stream of elements is split into windows based on a
@@ -356,6 +361,10 @@ public class WindowedStream<T, K, W extends Window> {
 		return input.transform(opName, resultType, operator);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Fold Function
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Applies the given fold function to each window. The window function is called for each
 	 * evaluation of the window for each key individually. The output of the reduce function is
@@ -500,6 +509,172 @@ public class WindowedStream<T, K, W extends Window> {
 		return input.transform(opName, resultType, operator);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Aggregation Function
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Applies the given fold function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the reduce function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * @param function The fold function.
+	 * @return The data stream that is the result of applying the fold function to the window.
+	 */
+	public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
+		checkNotNull(function, "function");
+
+		if (function instanceof RichFunction) {
+			throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
+		}
+
+		TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
+				function, input.getType(), null, false);
+
+		TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
+				function, input.getType(), null, false);
+
+		return aggregate(function, accumulatorType, resultType);
+	}
+
+	/**
+	 * Applies the given aggregation function to each window. The aggregation function is called for
+	 * each element, aggregating values incrementally and keeping the state to one accumulator
+	 * per key and window.
+	 *
+	 * @param function The aggregation function.
+	 * @return The data stream that is the result of applying the aggregation function to the window.
+	 */
+	public <ACC, R> SingleOutputStreamOperator<R> aggregate(
+			AggregateFunction<T, ACC, R> function,
+			TypeInformation<ACC> accumulatorType,
+			TypeInformation<R> resultType) {
+
+		checkNotNull(function, "function");
+		checkNotNull(accumulatorType, "accumulatorType");
+		checkNotNull(resultType, "resultType");
+
+		if (function instanceof RichFunction) {
+			throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
+		}
+
+		return aggregate(function, new PassThroughWindowFunction<K, W, R>(), accumulatorType, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
+	 * that the window function typically has only a single value to process when called.
+	 *
+	 * @param aggFunction The aggregate function that is used for incremental aggregation.
+	 * @param windowFunction The window function.
+	 * 
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <ACC, R> SingleOutputStreamOperator<R> aggregate(
+			AggregateFunction<T, ACC, R> aggFunction,
+			WindowFunction<R, R, K, W> windowFunction) {
+
+		checkNotNull(aggFunction, "aggFunction");
+		checkNotNull(windowFunction, "windowFunction");
+
+		TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
+				aggFunction, input.getType(), null, false);
+
+		TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
+				aggFunction, input.getType(), null, false);
+
+		return aggregate(aggFunction, windowFunction, accumulatorType, resultType);
+	}
+
+	/**
+	 * Applies the given window function to each window. The window function is called for each
+	 * evaluation of the window for each key individually. The output of the window function is
+	 * interpreted as a regular non-windowed stream.
+	 *
+	 * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
+	 * that the window function typically has only a single value to process when called.
+	 *
+	 * @param aggregateFunction The aggregation function that is used for incremental aggregation.
+	 * @param windowFunction The window function.
+	 * @param accumulatorType Type information for the internal accumulator type of the aggregation function
+	 * @param resultType Type information for the result type of the window function
+	 *    
+	 * @return The data stream that is the result of applying the window function to the window.
+	 */
+	public <ACC, R> SingleOutputStreamOperator<R> aggregate(
+			AggregateFunction<T, ACC, R> aggregateFunction,
+			WindowFunction<R, R, K, W> windowFunction, 
+			TypeInformation<ACC> accumulatorType,
+			TypeInformation<R> resultType) {
+
+		checkNotNull(aggregateFunction, "aggregateFunction");
+		checkNotNull(windowFunction, "windowFunction");
+		checkNotNull(accumulatorType, "accumulatorType");
+		checkNotNull(resultType, "resultType");
+
+		if (aggregateFunction instanceof RichFunction) {
+			throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
+		}
+
+		//clean the closures
+		windowFunction = input.getExecutionEnvironment().clean(windowFunction);
+		aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);
+
+		String callLocation = Utils.getCallLocationName();
+		String udfName = "WindowedStream." + callLocation;
+
+		String opName;
+		KeySelector<T, K> keySel = input.getKeySelector();
+
+		OneInputStreamOperator<T, R> operator;
+
+		if (evictor != null) {
+			@SuppressWarnings({"unchecked", "rawtypes"})
+			TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+					(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+			ListStateDescriptor<StreamRecord<T>> stateDesc =
+					new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+			operator = new EvictingWindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalIterableWindowFunction<>(new AggregateApplyWindowFunction<>(aggregateFunction, windowFunction)),
+					trigger,
+					evictor,
+					allowedLateness);
+
+		} else {
+			AggregatingStateDescriptor<T, ACC, R> stateDesc = new AggregatingStateDescriptor<>("window-contents",
+					aggregateFunction, accumulatorType.createSerializer(getExecutionEnvironment().getConfig()));
+
+			opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+			operator = new WindowOperator<>(windowAssigner,
+					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+					keySel,
+					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+					stateDesc,
+					new InternalSingleValueWindowFunction<>(windowFunction),
+					trigger,
+					allowedLateness);
+		}
+
+		return input.transform(opName, resultType, operator);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Window Function (apply)
+	// ------------------------------------------------------------------------
+	
 	/**
 	 * Applies the given window function to each window. The window function is called for each
 	 * evaluation of the window for each key individually. The output of the window function is
@@ -792,7 +967,7 @@ public class WindowedStream<T, K, W extends Window> {
 	}
 
 	// ------------------------------------------------------------------------
-	//  Aggregations on the keyed windows
+	//  Pre-defined aggregations on the keyed windows
 	// ------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
new file mode 100644
index 0000000..1b9fa88
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+@Internal
+public class AggregateApplyAllWindowFunction<W extends Window, T, ACC, R>
+	extends WrappingFunction<AllWindowFunction<R, R, W>>
+	implements AllWindowFunction<T, R, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final AggregateFunction<T, ACC, R> aggFunction;
+
+	public AggregateApplyAllWindowFunction(
+			AggregateFunction<T, ACC, R> aggFunction,
+			AllWindowFunction<R, R, W> windowFunction) {
+
+		super(windowFunction);
+		this.aggFunction = aggFunction;
+	}
+
+	@Override
+	public void apply(W window, Iterable<T> values, Collector<R> out) throws Exception {
+		final ACC acc = aggFunction.createAccumulator();
+
+		for (T value : values) {
+			aggFunction.add(value, acc);
+		}
+
+		wrappedFunction.apply(window, Collections.singletonList(aggFunction.getResult(acc)), out);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
new file mode 100644
index 0000000..5200bc2
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+@Internal
+public class AggregateApplyWindowFunction<K, W extends Window, T, ACC, R>
+	extends WrappingFunction<WindowFunction<R, R, K, W>>
+	implements WindowFunction<T, R, K, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final AggregateFunction<T, ACC, R> aggFunction;
+
+	public AggregateApplyWindowFunction(AggregateFunction<T, ACC, R> aggFunction, WindowFunction<R, R, K, W> windowFunction) {
+		super(windowFunction);
+		this.aggFunction = aggFunction;
+	}
+
+	@Override
+	public void apply(K key, W window, Iterable<T> values, Collector<R> out) throws Exception {
+		final ACC acc = aggFunction.createAccumulator();
+
+		for (T val : values) {
+			aggFunction.add(val, acc);
+		}
+
+		wrappedFunction.apply(key, window, Collections.singletonList(aggFunction.getResult(acc)), out);
+	}
+}