You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/22 22:12:16 UTC
[9/9] flink git commit: [FLINK-5582] [streaming] Add
'AggregateFunction' and 'AggregatingState'.
[FLINK-5582] [streaming] Add 'AggregateFunction' and 'AggregatingState'.
The AggregateFunction implements a very flexible interface for distributive aggregations.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09380e49
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09380e49
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09380e49
Branch: refs/heads/master
Commit: 09380e49256bff924734b9a932808e0f4daa7e5c
Parents: 2b3fd39
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 10 19:24:49 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jan 22 21:53:40 2017 +0100
----------------------------------------------------------------------
.../state/RocksDBAggregatingState.java | 205 ++++++++++++++
.../state/RocksDBKeyedStateBackend.java | 11 +
.../state/RocksDBAggregatingStateTest.java | 248 +++++++++++++++++
.../state/RocksDBStateBackendConfigTest.java | 4 +-
.../api/common/functions/AggregateFunction.java | 94 +++++++
.../api/common/state/AggregatingState.java | 45 +++
.../state/AggregatingStateDescriptor.java | 145 ++++++++++
.../flink/api/common/state/StateBackend.java | 13 +-
.../flink/api/common/state/StateDescriptor.java | 10 +-
.../flink/api/java/typeutils/TypeExtractor.java | 23 ++
.../apache/flink/core/fs/CloseableRegistry.java | 2 +
.../memory/ByteArrayInputStreamWithPos.java | 2 +
.../state/AbstractKeyedStateBackend.java | 23 ++
.../state/heap/HeapAggregatingState.java | 147 ++++++++++
.../state/heap/HeapKeyedStateBackend.java | 13 +
.../runtime/state/heap/HeapReducingState.java | 2 +-
.../internal/InternalAggregatingState.java | 33 +++
.../state/internal/InternalReducingState.java | 2 +-
.../runtime/state/SerializationProxiesTest.java | 3 +-
.../state/heap/HeapAggregatingStateTest.java | 274 +++++++++++++++++++
.../api/datastream/WindowedStream.java | 177 +++++++++++-
.../AggregateApplyAllWindowFunction.java | 56 ++++
.../windowing/AggregateApplyWindowFunction.java | 52 ++++
23 files changed, 1575 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
new file mode 100644
index 0000000..1f306b4
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * An {@link AggregatingState} implementation that stores state in RocksDB.
+ *
+ * @param <K> The type of the key
+ * @param <N> The type of the namespace
+ * @param <T> The type of the values that aggregated into the state
+ * @param <ACC> The type of the value stored in the state (the accumulator type)
+ * @param <R> The type of the value returned from the state
+ */
+public class RocksDBAggregatingState<K, N, T, ACC, R>
+ extends AbstractRocksDBState<K, N, AggregatingState<T, R>, AggregatingStateDescriptor<T, ACC, R>, ACC>
+ implements InternalAggregatingState<N, T, R> {
+
+ /** Serializer for the values */
+ private final TypeSerializer<ACC> valueSerializer;
+
+ /** User-specified aggregation function */
+ private final AggregateFunction<T, ACC, R> aggFunction;
+
+ /**
+ * We disable writes to the write-ahead-log here. We can't have these in the base class
+ * because JNI segfaults for some reason if they are.
+ */
+ private final WriteOptions writeOptions;
+
+ /**
+ * Creates a new {@code RocksDBFoldingState}.
+ *
+ * @param namespaceSerializer
+ * The serializer for the namespace.
+ * @param stateDesc
+ * The state identifier for the state. This contains the state name and aggregation function.
+ */
+ public RocksDBAggregatingState(
+ ColumnFamilyHandle columnFamily,
+ TypeSerializer<N> namespaceSerializer,
+ AggregatingStateDescriptor<T, ACC, R> stateDesc,
+ RocksDBKeyedStateBackend<K> backend) {
+
+ super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+ this.valueSerializer = stateDesc.getSerializer();
+ this.aggFunction = stateDesc.getAggregateFunction();
+
+ writeOptions = new WriteOptions();
+ writeOptions.setDisableWAL(true);
+ }
+
+ @Override
+ public R get() throws IOException {
+ try {
+ // prepare the current key and namespace for RocksDB lookup
+ writeCurrentKeyWithGroupAndNamespace();
+ final byte[] key = keySerializationStream.toByteArray();
+
+ // get the current value
+ final byte[] valueBytes = backend.db.get(columnFamily, key);
+
+ if (valueBytes == null) {
+ return null;
+ }
+
+ ACC accumulator = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+ return aggFunction.getResult(accumulator);
+ }
+ catch (IOException | RocksDBException e) {
+ throw new IOException("Error while retrieving value from RocksDB", e);
+ }
+ }
+
+ @Override
+ public void add(T value) throws IOException {
+ try {
+ // prepare the current key and namespace for RocksDB lookup
+ writeCurrentKeyWithGroupAndNamespace();
+ final byte[] key = keySerializationStream.toByteArray();
+ keySerializationStream.reset();
+
+ // get the current value
+ final byte[] valueBytes = backend.db.get(columnFamily, key);
+
+ // deserialize the current accumulator, or create a blank one
+ final ACC accumulator = valueBytes == null ?
+ aggFunction.createAccumulator() :
+ valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+
+ // aggregate the value into the accumulator
+ aggFunction.add(value, accumulator);
+
+ // serialize the new accumulator
+ final DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
+ valueSerializer.serialize(accumulator, out);
+
+ // write the new value to RocksDB
+ backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
+ }
+ catch (IOException | RocksDBException e) {
+ throw new IOException("Error while adding value to RocksDB", e);
+ }
+ }
+
+ @Override
+ public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
+ if (sources == null || sources.isEmpty()) {
+ return;
+ }
+
+ // cache key and namespace
+ final K key = backend.getCurrentKey();
+ final int keyGroup = backend.getCurrentKeyGroupIndex();
+
+ try {
+ ACC current = null;
+
+ // merge the sources to the target
+ for (N source : sources) {
+ if (source != null) {
+ writeKeyWithGroupAndNamespace(
+ keyGroup, key, source,
+ keySerializationStream, keySerializationDataOutputView);
+
+ final byte[] sourceKey = keySerializationStream.toByteArray();
+ final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
+
+ if (valueBytes != null) {
+ ACC value = valueSerializer.deserialize(
+ new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+
+ if (current != null) {
+ current = aggFunction.merge(current, value);
+ }
+ else {
+ current = value;
+ }
+ }
+ }
+ }
+
+ // if something came out of merging the sources, merge it or write it to the target
+ if (current != null) {
+ // create the target full-binary-key
+ writeKeyWithGroupAndNamespace(
+ keyGroup, key, target,
+ keySerializationStream, keySerializationDataOutputView);
+
+ final byte[] targetKey = keySerializationStream.toByteArray();
+ final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);
+
+ if (targetValueBytes != null) {
+ // target also had a value, merge
+ ACC value = valueSerializer.deserialize(
+ new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
+
+ current = aggFunction.merge(current, value);
+ }
+
+ // serialize the resulting value
+ keySerializationStream.reset();
+ valueSerializer.serialize(current, keySerializationDataOutputView);
+
+ // write the resulting value
+ backend.db.put(columnFamily, writeOptions, targetKey, keySerializationStream.toByteArray());
+ }
+ }
+ catch (Exception e) {
+ throw new Exception("Error while merging state in RocksDB", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 5a6d102..21ef8c2 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -18,6 +18,7 @@
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -48,6 +49,7 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
@@ -842,6 +844,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
+ protected <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
+ TypeSerializer<N> namespaceSerializer,
+ AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
+
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
+ return new RocksDBAggregatingState<>(columnFamily, namespaceSerializer, stateDesc, this);
+ }
+
+ @Override
protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
new file mode 100644
index 0000000..983e569
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static java.util.Arrays.asList;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the {@link InternalAggregatingState} implementation on top of RocksDB.
+ */
+public class RocksDBAggregatingStateTest {
+
+ @Rule
+ public final TemporaryFolder tmp = new TemporaryFolder();
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testAddAndGet() throws Exception {
+
+ final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
+ new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
+ stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
+ backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
+
+ final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
+
+ try {
+ InternalAggregatingState<VoidNamespace, Long, Long> state =
+ keyedBackend.createAggregatingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
+ state.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+ state.add(17L);
+ state.add(11L);
+ assertEquals(28L, state.get().longValue());
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertNull(state.get());
+ state.add(1L);
+ state.add(2L);
+
+ keyedBackend.setCurrentKey("def");
+ assertEquals(28L, state.get().longValue());
+ state.clear();
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ state.add(3L);
+ state.add(2L);
+ state.add(1L);
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertEquals(9L, state.get().longValue());
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ @Test
+ public void testMerging() throws Exception {
+
+ final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
+ new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
+ stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ final TimeWindow win1 = new TimeWindow(1000, 2000);
+ final TimeWindow win2 = new TimeWindow(2000, 3000);
+ final TimeWindow win3 = new TimeWindow(3000, 4000);
+
+ final Long expectedResult = 165L;
+
+ final RocksDBStateBackend backend = new RocksDBStateBackend(tmp.newFolder().toURI());
+ backend.setDbStoragePath(tmp.newFolder().getAbsolutePath());
+
+ final RocksDBKeyedStateBackend<String> keyedBackend = createKeyedBackend(backend);
+
+ try {
+ InternalAggregatingState<TimeWindow, Long, Long> state =
+ keyedBackend.createAggregatingState(new TimeWindow.Serializer(), stateDescr);
+
+ // populate the different namespaces
+ // - abc spreads the values over three namespaces
+ // - def spreads teh values over two namespaces (one empty)
+ // - ghi is empty
+ // - jkl has all elements already in the target namespace
+ // - mno has all elements already in one source namespace
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(win1);
+ state.add(33L);
+ state.add(55L);
+
+ state.setCurrentNamespace(win2);
+ state.add(22L);
+ state.add(11L);
+
+ state.setCurrentNamespace(win3);
+ state.add(44L);
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(win1);
+ state.add(11L);
+ state.add(44L);
+
+ state.setCurrentNamespace(win3);
+ state.add(22L);
+ state.add(55L);
+ state.add(33L);
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(win1);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(win3);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("abc");
+ state.mergeNamespaces(win1, asList(win2, win3));
+ state.setCurrentNamespace(win1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("def");
+ state.mergeNamespaces(win1, asList(win2, win3));
+ state.setCurrentNamespace(win1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("ghi");
+ state.mergeNamespaces(win1, asList(win2, win3));
+ state.setCurrentNamespace(win1);
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("jkl");
+ state.mergeNamespaces(win1, asList(win2, win3));
+ state.setCurrentNamespace(win1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("mno");
+ state.mergeNamespaces(win1, asList(win2, win3));
+ state.setCurrentNamespace(win1);
+ assertEquals(expectedResult, state.get());
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ private static RocksDBKeyedStateBackend<String> createKeyedBackend(RocksDBStateBackend backend) throws Exception {
+ return (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend(
+ new DummyEnvironment("TestTask", 1, 0),
+ new JobID(),
+ "test-op",
+ StringSerializer.INSTANCE,
+ 16,
+ new KeyGroupRange(2, 3),
+ mock(TaskKvStateRegistry.class));
+ }
+
+ // test functions
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ private static class AddingFunction implements AggregateFunction<Long, MutableLong, Long> {
+
+ @Override
+ public MutableLong createAccumulator() {
+ return new MutableLong();
+ }
+
+ @Override
+ public void add(Long value, MutableLong accumulator) {
+ accumulator.value += value;
+ }
+
+ @Override
+ public Long getResult(MutableLong accumulator) {
+ return accumulator.value;
+ }
+
+ @Override
+ public MutableLong merge(MutableLong a, MutableLong b) {
+ a.value += b.value;
+ return a;
+ }
+ }
+
+ private static final class MutableLong {
+ long value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index bc0777c..9524352 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -351,11 +351,11 @@ public class RocksDBStateBackendConfigTest {
// Utilities
// ------------------------------------------------------------------------
- private static Environment getMockEnvironment() {
+ static Environment getMockEnvironment() {
return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) });
}
- private static Environment getMockEnvironment(File[] tempDirs) {
+ static Environment getMockEnvironment(File[] tempDirs) {
final String[] tempDirStrings = new String[tempDirs.length];
for (int i = 0; i < tempDirs.length; i++) {
tempDirStrings[i] = tempDirs[i].getAbsolutePath();
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
new file mode 100644
index 0000000..507be63
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+/**
+ *
+ * <p>Aggregation functions must be {@link Serializable} because they are sent around
+ * between distributed processes during distributed execution.
+ *
+ * <p>An example how to use this interface is below:
+ *
+ * <pre>{@code
+ * // the accumulator, which holds the state of the in-flight aggregate
+ * public class AverageAccumulator {
+ * long count;
+ * long sum;
+ * }
+ *
+ * // implementation of an aggregation function for an 'average'
+ * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Integer value, AverageAccumulator acc) {
+ * acc.sum += value;
+ * acc.count++;
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ *
+ * // implementation of a weighted average
+ * // this reuses the same accumulator type as the aggregate function for 'average'
+ * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
+ *
+ * public AverageAccumulator createAccumulator() {
+ * return new AverageAccumulator();
+ * }
+ *
+ * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
+ * a.count += b.count;
+ * a.sum += b.sum;
+ * return a;
+ * }
+ *
+ * public void add(Datum value, AverageAccumulator acc) {
+ * acc.count += value.getWeight();
+ * acc.sum += value.getValue();
+ * }
+ *
+ * public Double getResult(AverageAccumulator acc) {
+ * return acc.sum / (double) acc.count;
+ * }
+ * }
+ * }</pre>
+ */
+public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
+
+ ACC createAccumulator();
+
+ void add(IN value, ACC accumulator);
+
+ OUT getResult(ACC accumulator);
+
+ ACC merge(ACC a, ACC b);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
new file mode 100644
index 0000000..c679285
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AggregateFunction;
+
+/**
+ * {@link State} interface for aggregating state, based on an
+ * {@link AggregateFunction}. Elements that are added to this type of state will
+ * be eagerly pre-aggregated using a given {@code AggregateFunction}.
+ *
+ * <p>The state holds internally always the accumulator type of the {@code AggregateFunction}.
+ * When accessing the result of the state, the function's
+ * {@link AggregateFunction#getResult(Object)} method.
+ *
+ * <p>The state is accessed and modified by user functions, and checkpointed consistently
+ * by the system as part of the distributed snapshots.
+ *
+ * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * automatically supplied by the system, so the function always sees the value mapped to the
+ * key of the current element. That way, the system can handle stream and state partitioning
+ * consistently together.
+ *
+ * @param <IN> Type of the value added to the state.
+ * @param <OUT> Type of the value extracted from the state.
+ */
+@PublicEvolving
+public interface AggregatingState<IN, OUT> extends MergingState<IN, OUT> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
new file mode 100644
index 0000000..abdac91
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A StateDescriptor for {@link AggregatingState}.
+ *
+ * <p>The type internally stored in the state is the type of the {@code Accumulator} of the
+ * {@code AggregateFunction}.
+ *
+ * @param <IN> The type of the values that are added to the state.
+ * @param <ACC> The type of the accumulator (intermediate aggregation state).
+ * @param <OUT> The type of the values that are returned from the state.
+ */
+@PublicEvolving
+public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<AggregatingState<IN, OUT>, ACC> {
+ private static final long serialVersionUID = 1L;
+
+ /** The aggregation function for the state */
+ private final AggregateFunction<IN, ACC, OUT> aggFunction;
+
+ /**
+ * Creates a new state descriptor with the given name, function, and type.
+ *
+ * <p>If this constructor fails (because it is not possible to describe the type via a class),
+ * consider using the {@link #AggregatingStateDescriptor(String, AggregateFunction, TypeInformation)} constructor.
+ *
+ * @param name The (unique) name for the state.
+ * @param aggFunction The {@code AggregateFunction} used to aggregate the state.
+ * @param stateType The type of the accumulator. The accumulator is stored in the state.
+ */
+ public AggregatingStateDescriptor(
+ String name,
+ AggregateFunction<IN, ACC, OUT> aggFunction,
+ Class<ACC> stateType) {
+
+ super(name, stateType, null);
+ this.aggFunction = checkNotNull(aggFunction);
+ }
+
+ /**
+ * Creates a new {@code ReducingStateDescriptor} with the given name and default value.
+ *
+ * @param name The (unique) name for the state.
+ * @param aggFunction The {@code AggregateFunction} used to aggregate the state.
+ * @param stateType The type of the accumulator. The accumulator is stored in the state.
+ */
+ public AggregatingStateDescriptor(
+ String name,
+ AggregateFunction<IN, ACC, OUT> aggFunction,
+ TypeInformation<ACC> stateType) {
+
+ super(name, stateType, null);
+ this.aggFunction = checkNotNull(aggFunction);
+ }
+
+ /**
+ * Creates a new {@code ValueStateDescriptor} with the given name and default value.
+ *
+ * @param name The (unique) name for the state.
+ * @param aggFunction The {@code AggregateFunction} used to aggregate the state.
+ * @param typeSerializer The serializer for the accumulator. The accumulator is stored in the state.
+ */
+ public AggregatingStateDescriptor(
+ String name,
+ AggregateFunction<IN, ACC, OUT> aggFunction,
+ TypeSerializer<ACC> typeSerializer) {
+
+ super(name, typeSerializer, null);
+ this.aggFunction = checkNotNull(aggFunction);
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public AggregatingState<IN, OUT> bind(StateBackend stateBackend) throws Exception {
+ return stateBackend.createAggregatingState(this);
+ }
+
+ /**
+ * Returns the aggregate function to be used for the state.
+ */
+ public AggregateFunction<IN, ACC, OUT> getAggregateFunction() {
+ return aggFunction;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.AGGREGATING;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ else if (o != null && getClass() == o.getClass()) {
+ AggregatingStateDescriptor<?, ?, ?> that = (AggregatingStateDescriptor<?, ?, ?>) o;
+ return serializer.equals(that.serializer) && name.equals(that.name);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ int result = serializer.hashCode();
+ result = 31 * result + name.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "AggregatingStateDescriptor{" +
+ "serializer=" + serializer +
+ ", aggFunction=" + aggFunction +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
index d396a31..f9d1af7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
@@ -47,11 +47,22 @@ public interface StateBackend {
* Creates and returns a new {@link ReducingState}.
* @param stateDesc The {@code StateDescriptor} that contains the name of the state.
*
- * @param <T> The type of the values that the {@code ListState} can store.
+ * @param <T> The type of the values that the {@code ReducingState} can store.
*/
<T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception;
/**
+ * Creates and returns a new {@link AggregatingState}.
+ * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+ *
+ * @param <IN> The type of the values that go into the aggregating state
+ * @param <ACC> The type of the values that are stored in the aggregating state
+ * @param <OUT> The type of the values that come out of the aggregating state
+ */
+ <IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(
+ AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception;
+
+ /**
* Creates and returns a new {@link FoldingState}.
* @param stateDesc The {@code StateDescriptor} that contains the name of the state.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index ad9d417..b901d03 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -49,13 +49,19 @@ import static java.util.Objects.requireNonNull;
@PublicEvolving
public abstract class StateDescriptor<S extends State, T> implements Serializable {
- // Do not change the order of the elements in this enum, ordinal is used in serialization
+ /**
+ * An enumeration of the types of supported states. Used to identify the state type
+ * when writing and restoring checkpoints and savepoints.
+ */
+ // IMPORTANT: Do not change the order of the elements in this enum, ordinal is used in serialization
public enum Type {
- @Deprecated UNKNOWN, VALUE, LIST, REDUCING, FOLDING
+ @Deprecated UNKNOWN, VALUE, LIST, REDUCING, FOLDING, AGGREGATING
}
private static final long serialVersionUID = 1L;
+ // ------------------------------------------------------------------------
+
/** Name that uniquely identifies state created from this StateDescriptor. */
protected final String name;
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a2664f9..2b9eed9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.ClassUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -189,6 +190,28 @@ public class TypeExtractor {
}
@PublicEvolving
+ public static <IN, ACC> TypeInformation<ACC> getAggregateFunctionAccumulatorType(
+ AggregateFunction<IN, ACC, ?> function,
+ TypeInformation<IN> inType,
+ String functionName,
+ boolean allowMissing)
+ {
+ return getUnaryOperatorReturnType(
+ function, AggregateFunction.class, 0, 1, inType, functionName, allowMissing);
+ }
+
+ @PublicEvolving
+ public static <IN, OUT> TypeInformation<OUT> getAggregateFunctionReturnType(
+ AggregateFunction<IN, ?, OUT> function,
+ TypeInformation<IN> inType,
+ String functionName,
+ boolean allowMissing)
+ {
+ return getUnaryOperatorReturnType(
+ function, AggregateFunction.class, 0, 2, inType, functionName, allowMissing);
+ }
+
+ @PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, TypeInformation<IN> inType) {
return getMapPartitionReturnTypes(mapPartitionInterface, inType, null, false);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
index 81ba7ab..0d4ea0c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -18,6 +18,7 @@
package org.apache.flink.core.fs;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.util.AbstractCloseableRegistry;
import java.io.Closeable;
@@ -32,6 +33,7 @@ import java.util.Map;
* <p>
* All methods in this class are thread-safe.
*/
+@Internal
public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Object> {
private static final Object DUMMY = new Object();
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
index 46b82c7..dd381a4 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
@@ -18,6 +18,7 @@
package org.apache.flink.core.memory;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -26,6 +27,7 @@ import java.io.InputStream;
/**
* Un-synchronized stream similar to Java's ByteArrayInputStream that also exposes the current position.
*/
+@Internal
public class ByteArrayInputStreamWithPos extends InputStream {
protected byte[] buffer;
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index fcca77a..c8e0d0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
@@ -33,6 +35,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
@@ -159,6 +162,19 @@ public abstract class AbstractKeyedStateBackend<K>
ReducingStateDescriptor<T> stateDesc) throws Exception;
/**
+ * Creates and returns a new {@link AggregatingState}.
+ *
+ * @param namespaceSerializer TypeSerializer for the state namespace.
+ * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+ *
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values that the {@code ListState} can store.
+ */
+ protected abstract <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
+ TypeSerializer<N> namespaceSerializer,
+ AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception;
+
+ /**
* Creates and returns a new {@link FoldingState}.
*
* @param namespaceSerializer TypeSerializer for the state namespace.
@@ -265,6 +281,13 @@ public abstract class AbstractKeyedStateBackend<K>
}
@Override
+ public <T, ACC, R> AggregatingState<T, R> createAggregatingState(
+ AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
+ return AbstractKeyedStateBackend.this.createAggregatingState(namespaceSerializer, stateDesc);
+ }
+
+
+ @Override
public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
new file mode 100644
index 0000000..624b83e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Heap-backed partitioned {@link ReducingState} that is
+ * snapshotted into files.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <IN> The type of the value added to the state.
+ * @param <ACC> The type of the value stored in the state (the accumulator type).
+ * @param <OUT> The type of the value returned from the state.
+ */
+public class HeapAggregatingState<K, N, IN, ACC, OUT>
+ extends AbstractHeapMergingState<K, N, IN, OUT, ACC, AggregatingState<IN, OUT>, AggregatingStateDescriptor<IN, ACC, OUT>>
+ implements InternalAggregatingState<N, IN, OUT> {
+
+ private final AggregateFunction<IN, ACC, OUT> aggFunction;
+
+ /**
+ * Creates a new key/value state for the given hash map of key/value pairs.
+ *
+ * @param backend
+ * The state backend backing that created this state.
+ * @param stateDesc
+ * The state identifier for the state. This contains name and can create a default state value.
+ * @param stateTable
+ * The state table to use in this kev/value state. May contain initial state.
+ * @param namespaceSerializer
+ * The serializer for the type that indicates the namespace
+ */
+ public HeapAggregatingState(
+ KeyedStateBackend<K> backend,
+ AggregatingStateDescriptor<IN, ACC, OUT> stateDesc,
+ StateTable<K, N, ACC> stateTable,
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<N> namespaceSerializer) {
+
+ super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ this.aggFunction = stateDesc.getAggregateFunction();
+ }
+
+ // ------------------------------------------------------------------------
+ // state access
+ // ------------------------------------------------------------------------
+
+ @Override
+ public OUT get() {
+ final K key = backend.getCurrentKey();
+
+ checkState(currentNamespace != null, "No namespace set.");
+ checkState(key != null, "No key set.");
+
+ Map<N, Map<K, ACC>> namespaceMap =
+ stateTable.get(backend.getCurrentKeyGroupIndex());
+
+ if (namespaceMap == null) {
+ return null;
+ }
+
+ Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
+
+ if (keyedMap == null) {
+ return null;
+ }
+
+ ACC accumulator = keyedMap.get(key);
+ return aggFunction.getResult(accumulator);
+ }
+
+ @Override
+ public void add(IN value) throws IOException {
+ final K key = backend.getCurrentKey();
+
+ checkState(currentNamespace != null, "No namespace set.");
+ checkState(key != null, "No key set.");
+
+ if (value == null) {
+ clear();
+ return;
+ }
+
+ Map<N, Map<K, ACC>> namespaceMap =
+ stateTable.get(backend.getCurrentKeyGroupIndex());
+
+ if (namespaceMap == null) {
+ namespaceMap = createNewMap();
+ stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
+ }
+
+ Map<K, ACC> keyedMap = namespaceMap.get(currentNamespace);
+
+ if (keyedMap == null) {
+ keyedMap = createNewMap();
+ namespaceMap.put(currentNamespace, keyedMap);
+ }
+
+ // if this is the first value for the key, create a new accumulator
+ ACC accumulator = keyedMap.get(key);
+ if (accumulator == null) {
+ accumulator = aggFunction.createAccumulator();
+ keyedMap.put(key, accumulator);
+ }
+
+ //
+ aggFunction.add(value, accumulator);
+ }
+
+ // ------------------------------------------------------------------------
+ // state merging
+ // ------------------------------------------------------------------------
+
+ @Override
+ protected ACC mergeState(ACC a, ACC b) throws Exception {
+ return aggFunction.merge(a, b);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index b05b874..b4c2b8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.state.heap;
import org.apache.commons.io.IOUtils;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -50,12 +52,14 @@ import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -169,6 +173,15 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
+ public <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
+ TypeSerializer<N> namespaceSerializer,
+ AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
+
+ StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
+ return new HeapAggregatingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
+ }
+
+ @Override
protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
index 7804cb4..090a660 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
@@ -49,7 +49,7 @@ public class HeapReducingState<K, N, V>
* @param backend The state backend backing that created this state.
* @param stateDesc The state identifier for the state. This contains name
* and can create a default state value.
- * @param stateTable The state tab;e to use in this kev/value state. May contain initial state.
+ * @param stateTable The state table to use in this kev/value state. May contain initial state.
*/
public HeapReducingState(
KeyedStateBackend<K> backend,
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
new file mode 100644
index 0000000..15a8e31
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAggregatingState.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.AggregatingState;
+
+/**
+ * The peer to the {@link AggregatingState} in the internal state type hierarchy.
+ *
+ * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
+ *
+ * @param <N> The type of the namespace
+ * @param <IN> Type of the value added to the state.
+ * @param <OUT> Type of the value extracted from the state.
+ */
+public interface InternalAggregatingState<N, IN, OUT>
+ extends InternalMergingState<N, IN, OUT>, AggregatingState<IN, OUT> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
index 40e625c..76fa58f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
@@ -26,6 +26,6 @@ import org.apache.flink.api.common.state.ReducingState;
* <p>See {@link InternalKvState} for a description of the internal state hierarchy.
*
* @param <N> The type of the namespace
- * @param <T> The type of elements in the list
+ * @param <T> The type of elements in the aggregated by the ReduceFunction
*/
public interface InternalReducingState<N, T> extends InternalMergingState<N, T, T>, ReducingState<T> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 2448540..66e8d02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -161,12 +161,13 @@ public class SerializationProxiesTest {
@Test
public void testFixTypeOrder() {
// ensure all elements are covered
- Assert.assertEquals(5, StateDescriptor.Type.values().length);
+ Assert.assertEquals(6, StateDescriptor.Type.values().length);
// fix the order of elements to keep serialization format stable
Assert.assertEquals(0, StateDescriptor.Type.UNKNOWN.ordinal());
Assert.assertEquals(1, StateDescriptor.Type.VALUE.ordinal());
Assert.assertEquals(2, StateDescriptor.Type.LIST.ordinal());
Assert.assertEquals(3, StateDescriptor.Type.REDUCING.ordinal());
Assert.assertEquals(4, StateDescriptor.Type.FOLDING.ordinal());
+ Assert.assertEquals(5, StateDescriptor.Type.AGGREGATING.ordinal());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
new file mode 100644
index 0000000..a7ae5be
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the simple Java heap objects implementation of the {@link AggregatingState}.
+ */
+public class HeapAggregatingStateTest {
+
+ @Test
+ public void testAddAndGet() throws Exception {
+
+ final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
+ new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
+ stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
+
+ try {
+ InternalAggregatingState<VoidNamespace, Long, Long> state =
+ keyedBackend.createAggregatingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
+ state.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+ state.add(17L);
+ state.add(11L);
+ assertEquals(28L, state.get().longValue());
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertNull(state.get());
+ state.add(1L);
+ state.add(2L);
+
+ keyedBackend.setCurrentKey("def");
+ assertEquals(28L, state.get().longValue());
+ state.clear();
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ state.add(3L);
+ state.add(2L);
+ state.add(1L);
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertEquals(9L, state.get().longValue());
+ state.clear();
+
+ // make sure all lists / maps are cleared
+
+ StateTable<String, VoidNamespace, MutableLong> stateTable =
+ ((HeapAggregatingState<String, VoidNamespace, Long, MutableLong, Long>) state).stateTable;
+
+ assertTrue(stateTable.isEmpty());
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ @Test
+ public void testMerging() throws Exception {
+
+ final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr =
+ new AggregatingStateDescriptor<>("my-state", new AddingFunction(), MutableLong.class);
+ stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ final Integer namespace1 = 1;
+ final Integer namespace2 = 2;
+ final Integer namespace3 = 3;
+
+ final Long expectedResult = 165L;
+
+ final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
+
+ try {
+ InternalAggregatingState<Integer, Long, Long> state =
+ keyedBackend.createAggregatingState(IntSerializer.INSTANCE, stateDescr);
+
+ // populate the different namespaces
+ // - abc spreads the values over three namespaces
+ // - def spreads teh values over two namespaces (one empty)
+ // - ghi is empty
+ // - jkl has all elements already in the target namespace
+ // - mno has all elements already in one source namespace
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.add(33L);
+ state.add(55L);
+
+ state.setCurrentNamespace(namespace2);
+ state.add(22L);
+ state.add(11L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(44L);
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(44L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(22L);
+ state.add(55L);
+ state.add(33L);
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace3);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("abc");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("def");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("ghi");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("jkl");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("mno");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ // make sure all lists / maps are cleared
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("ghi");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ StateTable<String, Integer, MutableLong> stateTable =
+ ((HeapAggregatingState<String, Integer, Long, MutableLong, Long>) state).stateTable;
+
+ assertTrue(stateTable.isEmpty());
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception {
+ return new HeapKeyedStateBackend<>(
+ mock(TaskKvStateRegistry.class),
+ StringSerializer.INSTANCE,
+ HeapAggregatingStateTest.class.getClassLoader(),
+ 16,
+ new KeyGroupRange(0, 15));
+ }
+
+ // ------------------------------------------------------------------------
+ // test functions
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ private static class AddingFunction implements AggregateFunction<Long, MutableLong, Long> {
+
+ @Override
+ public MutableLong createAccumulator() {
+ return new MutableLong();
+ }
+
+ @Override
+ public void add(Long value, MutableLong accumulator) {
+ accumulator.value += value;
+ }
+
+ @Override
+ public Long getResult(MutableLong accumulator) {
+ return accumulator.value;
+ }
+
+ @Override
+ public MutableLong merge(MutableLong a, MutableLong b) {
+ a.value += b.value;
+ return a;
+ }
+ }
+
+ private static final class MutableLong {
+ long value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 704875b..30e64c4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -20,10 +20,12 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -36,6 +38,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
@@ -63,6 +66,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A {@code WindowedStream} represents a data stream where elements are grouped by
* key, and for each key, the stream of elements is split into windows based on a
@@ -356,6 +361,10 @@ public class WindowedStream<T, K, W extends Window> {
return input.transform(opName, resultType, operator);
}
+ // ------------------------------------------------------------------------
+ // Fold Function
+ // ------------------------------------------------------------------------
+
/**
* Applies the given fold function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the reduce function is
@@ -500,6 +509,172 @@ public class WindowedStream<T, K, W extends Window> {
return input.transform(opName, resultType, operator);
}
+ // ------------------------------------------------------------------------
+ // Aggregation Function
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies the given fold function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the reduce function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * @param function The fold function.
+ * @return The data stream that is the result of applying the fold function to the window.
+ */
+ public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
+ checkNotNull(function, "function");
+
+ if (function instanceof RichFunction) {
+ throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
+ }
+
+ TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
+ function, input.getType(), null, false);
+
+ TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
+ function, input.getType(), null, false);
+
+ return aggregate(function, accumulatorType, resultType);
+ }
+
+ /**
+ * Applies the given aggregation function to each window. The aggregation function is called for
+ * each element, aggregating values incrementally and keeping the state to one accumulator
+ * per key and window.
+ *
+ * @param function The aggregation function.
+ * @return The data stream that is the result of applying the aggregation function to the window.
+ */
+ public <ACC, R> SingleOutputStreamOperator<R> aggregate(
+ AggregateFunction<T, ACC, R> function,
+ TypeInformation<ACC> accumulatorType,
+ TypeInformation<R> resultType) {
+
+ checkNotNull(function, "function");
+ checkNotNull(accumulatorType, "accumulatorType");
+ checkNotNull(resultType, "resultType");
+
+ if (function instanceof RichFunction) {
+ throw new UnsupportedOperationException("This aggregation function cannot be a RichFunction.");
+ }
+
+ return aggregate(function, new PassThroughWindowFunction<K, W, R>(), accumulatorType, resultType);
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
+ * that the window function typically has only a single value to process when called.
+ *
+ * @param aggFunction The aggregate function that is used for incremental aggregation.
+ * @param windowFunction The window function.
+ *
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ public <ACC, R> SingleOutputStreamOperator<R> aggregate(
+ AggregateFunction<T, ACC, R> aggFunction,
+ WindowFunction<R, R, K, W> windowFunction) {
+
+ checkNotNull(aggFunction, "aggFunction");
+ checkNotNull(windowFunction, "windowFunction");
+
+ TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
+ aggFunction, input.getType(), null, false);
+
+ TypeInformation<R> resultType = TypeExtractor.getAggregateFunctionReturnType(
+ aggFunction, input.getType(), null, false);
+
+ return aggregate(aggFunction, windowFunction, accumulatorType, resultType);
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
+ * that the window function typically has only a single value to process when called.
+ *
+ * @param aggregateFunction The aggregation function that is used for incremental aggregation.
+ * @param windowFunction The window function.
+ * @param accumulatorType Type information for the internal accumulator type of the aggregation function
+ * @param resultType Type information for the result type of the window function
+ *
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ public <ACC, R> SingleOutputStreamOperator<R> aggregate(
+ AggregateFunction<T, ACC, R> aggregateFunction,
+ WindowFunction<R, R, K, W> windowFunction,
+ TypeInformation<ACC> accumulatorType,
+ TypeInformation<R> resultType) {
+
+ checkNotNull(aggregateFunction, "aggregateFunction");
+ checkNotNull(windowFunction, "windowFunction");
+ checkNotNull(accumulatorType, "accumulatorType");
+ checkNotNull(resultType, "resultType");
+
+ if (aggregateFunction instanceof RichFunction) {
+ throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
+ }
+
+ //clean the closures
+ windowFunction = input.getExecutionEnvironment().clean(windowFunction);
+ aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);
+
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "WindowedStream." + callLocation;
+
+ String opName;
+ KeySelector<T, K> keySel = input.getKeySelector();
+
+ OneInputStreamOperator<T, R> operator;
+
+ if (evictor != null) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+ (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+ ListStateDescriptor<StreamRecord<T>> stateDesc =
+ new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+ operator = new EvictingWindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalIterableWindowFunction<>(new AggregateApplyWindowFunction<>(aggregateFunction, windowFunction)),
+ trigger,
+ evictor,
+ allowedLateness);
+
+ } else {
+ AggregatingStateDescriptor<T, ACC, R> stateDesc = new AggregatingStateDescriptor<>("window-contents",
+ aggregateFunction, accumulatorType.createSerializer(getExecutionEnvironment().getConfig()));
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+ operator = new WindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalSingleValueWindowFunction<>(windowFunction),
+ trigger,
+ allowedLateness);
+ }
+
+ return input.transform(opName, resultType, operator);
+ }
+
+ // ------------------------------------------------------------------------
+ // Window Function (apply)
+ // ------------------------------------------------------------------------
+
/**
* Applies the given window function to each window. The window function is called for each
* evaluation of the window for each key individually. The output of the window function is
@@ -792,7 +967,7 @@ public class WindowedStream<T, K, W extends Window> {
}
// ------------------------------------------------------------------------
- // Aggregations on the keyed windows
+ // Pre-defined aggregations on the keyed windows
// ------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
new file mode 100644
index 0000000..1b9fa88
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+@Internal
+public class AggregateApplyAllWindowFunction<W extends Window, T, ACC, R>
+ extends WrappingFunction<AllWindowFunction<R, R, W>>
+ implements AllWindowFunction<T, R, W> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final AggregateFunction<T, ACC, R> aggFunction;
+
+ public AggregateApplyAllWindowFunction(
+ AggregateFunction<T, ACC, R> aggFunction,
+ AllWindowFunction<R, R, W> windowFunction) {
+
+ super(windowFunction);
+ this.aggFunction = aggFunction;
+ }
+
+ @Override
+ public void apply(W window, Iterable<T> values, Collector<R> out) throws Exception {
+ final ACC acc = aggFunction.createAccumulator();
+
+ for (T value : values) {
+ aggFunction.add(value, acc);
+ }
+
+ wrappedFunction.apply(window, Collections.singletonList(aggFunction.getResult(acc)), out);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09380e49/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
new file mode 100644
index 0000000..5200bc2
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+@Internal
+public class AggregateApplyWindowFunction<K, W extends Window, T, ACC, R>
+ extends WrappingFunction<WindowFunction<R, R, K, W>>
+ implements WindowFunction<T, R, K, W> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final AggregateFunction<T, ACC, R> aggFunction;
+
+ public AggregateApplyWindowFunction(AggregateFunction<T, ACC, R> aggFunction, WindowFunction<R, R, K, W> windowFunction) {
+ super(windowFunction);
+ this.aggFunction = aggFunction;
+ }
+
+ @Override
+ public void apply(K key, W window, Iterable<T> values, Collector<R> out) throws Exception {
+ final ACC acc = aggFunction.createAccumulator();
+
+ for (T val : values) {
+ aggFunction.add(val, acc);
+ }
+
+ wrappedFunction.apply(key, window, Collections.singletonList(aggFunction.getResult(acc)), out);
+ }
+}