You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/22 22:12:13 UTC
[6/9] flink git commit: [FLINK-5590] [runtime] Add proper internal
state hierarchy
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index 9552325..f0eb53e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.Preconditions;
import java.io.ByteArrayOutputStream;
@@ -39,8 +40,8 @@ import java.util.Map;
* @param <V> The type of the value.
*/
public class HeapListState<K, N, V>
- extends AbstractHeapState<K, N, ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
- implements ListState<V> {
+ extends AbstractHeapMergingState<K, N, V, Iterable<V>, ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
+ implements InternalListState<N, V> {
/**
* Creates a new key/value state for the given hash map of key/value pairs.
@@ -59,6 +60,10 @@ public class HeapListState<K, N, V>
super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
}
+ // ------------------------------------------------------------------------
+ // state access
+ // ------------------------------------------------------------------------
+
@Override
public Iterable<V> get() {
Preconditions.checkState(currentNamespace != null, "No namespace set.");
@@ -154,4 +159,14 @@ public class HeapListState<K, N, V>
return baos.toByteArray();
}
+
+ // ------------------------------------------------------------------------
+ // state merging
+ // ------------------------------------------------------------------------
+
+ @Override
+ protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) {
+ a.addAll(b);
+ return a;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
index 37aa812..7804cb4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -37,8 +38,8 @@ import java.util.Map;
* @param <V> The type of the value.
*/
public class HeapReducingState<K, N, V>
- extends AbstractHeapState<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>>
- implements ReducingState<V> {
+ extends AbstractHeapMergingState<K, N, V, V, V, ReducingState<V>, ReducingStateDescriptor<V>>
+ implements InternalReducingState<N, V> {
private final ReduceFunction<V> reduceFunction;
@@ -56,10 +57,15 @@ public class HeapReducingState<K, N, V>
StateTable<K, N, V> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
+
super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
this.reduceFunction = stateDesc.getReduceFunction();
}
+ // ------------------------------------------------------------------------
+ // state access
+ // ------------------------------------------------------------------------
+
@Override
public V get() {
Preconditions.checkState(currentNamespace != null, "No namespace set.");
@@ -111,13 +117,22 @@ public class HeapReducingState<K, N, V>
if (currentValue == null) {
// we're good, just added the new value
} else {
- V reducedValue = null;
+ V reducedValue;
try {
reducedValue = reduceFunction.reduce(currentValue, value);
} catch (Exception e) {
- throw new RuntimeException("Could not add value to reducing state.", e);
+ throw new IOException("Exception while applying ReduceFunction in reducing state", e);
}
keyedMap.put(backend.<K>getCurrentKey(), reducedValue);
}
}
+
+ // ------------------------------------------------------------------------
+ // state merging
+ // ------------------------------------------------------------------------
+
+ @Override
+ protected V mergeState(V a, V b) throws Exception {
+ return reduceFunction.reduce(a, b);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
index cccaacb..9e042fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.Preconditions;
import java.util.Map;
@@ -36,7 +37,7 @@ import java.util.Map;
*/
public class HeapValueState<K, N, V>
extends AbstractHeapState<K, N, V, ValueState<V>, ValueStateDescriptor<V>>
- implements ValueState<V> {
+ implements InternalValueState<N, V> {
/**
* Creates a new key/value state for the given hash map of key/value pairs.
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 797150a..9d7232e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.runtime.state.heap;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -78,4 +79,21 @@ public class StateTable<K, N, ST> {
public List<Map<N, Map<K, ST>>> getState() {
return state;
}
+
+ // ------------------------------------------------------------------------
+ // for testing
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ boolean isEmpty() {
+ for (Map<N, Map<K, ST>> map : state) {
+ if (map != null) {
+ if (!map.isEmpty()) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
new file mode 100644
index 0000000..ae9f457
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalAppendingState.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.AppendingState;
+
+/**
+ * The peer to the {@link AppendingState} in the internal state type hierarchy.
+ *
+ * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
+ *
+ * @param <N> The type of the namespace
+ * @param <IN> The type of elements added to the state
+ * @param <OUT> The type of the
+ */
+public interface InternalAppendingState<N, IN, OUT> extends InternalKvState<N>, AppendingState<IN, OUT> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
new file mode 100644
index 0000000..eb58ce5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalFoldingState.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.FoldingState;
+
+/**
+ * The peer to the {@link FoldingState} in the internal state type hierarchy.
+ *
+ * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
+ *
+ * @param <N> The type of the namespace
+ * @param <T> Type of the values folded into the state
+ * @param <ACC> Type of the value in the state
+ */
+public interface InternalFoldingState<N, T, ACC> extends InternalAppendingState<N, T, ACC>, FoldingState<T, ACC> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
new file mode 100644
index 0000000..06f64b6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalKvState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.State;
+
+/**
+ * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the
+ * {@link State} being the root of the public API state hierarchy.
+ *
+ * <p>The internal state classes give access to the namespace getters and setters and access to
+ * additional functionality, like raw value access or state merging.
+ *
+ * <p>The public API state hierarchy is intended to be programmed against by Flink applications.
+ * The internal state hierarchy holds all the auxiliary methods that are used by the runtime and not
+ * intended to be used by user applications. These internal methods are considered of limited use to users and
+ * only confusing, and are usually not regarded as stable across releases.
+ *
+ * <p>Each specific type in the internal state hierarchy extends the type from the public
+ * state hierarchy:
+ *
+ * <pre>
+ * State
+ * |
+ * +-------------------InternalKvState
+ * | |
+ * MergingState |
+ * | |
+ * +-----------------InternalMergingState
+ * | |
+ * +--------+------+ |
+ * | | |
+ * ReducingState ListState +-----+-----------------+
+ * | | | |
+ * +-----------+ +----------- -----------------InternalListState
+ * | |
+ * +---------InternalReducingState
+ * </pre>
+ *
+ * @param <N> The type of the namespace.
+ */
+public interface InternalKvState<N> extends State {
+
+ /**
+ * Sets the current namespace, which will be used when using the state access methods.
+ *
+ * @param namespace The namespace.
+ */
+ void setCurrentNamespace(N namespace);
+
+ /**
+ * Returns the serialized value for the given key and namespace.
+ *
+ * <p>If no value is associated with key and namespace, <code>null</code>
+ * is returned.
+ *
+ * @param serializedKeyAndNamespace Serialized key and namespace
+ * @return Serialized value or <code>null</code> if no value is associated with the key and namespace.
+ *
+ * @throws Exception Exceptions during serialization are forwarded
+ */
+ byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
new file mode 100644
index 0000000..ae392ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalListState.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.ListState;
+
+/**
+ * The peer to the {@link ListState} in the internal state type hierarchy.
+ *
+ * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
+ *
+ * @param <N> The type of the namespace
+ * @param <T> The type of elements in the list
+ */
+public interface InternalListState<N, T> extends InternalMergingState<N, T, Iterable<T>>, ListState<T> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
new file mode 100644
index 0000000..abc7d7c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMergingState.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.MergingState;
+
+import java.util.Collection;
+
+/**
+ * The peer to the {@link MergingState} in the internal state type hierarchy.
+ *
+ * See {@link InternalKvState} for a description of the internal state hierarchy.
+ *
+ * @param <N> The type of the namespace
+ * @param <IN> The type of elements added to the state
+ * @param <OUT> The type of elements
+ */
+public interface InternalMergingState<N, IN, OUT> extends InternalAppendingState<N, IN, OUT>, MergingState<IN, OUT> {
+
+ /**
+ * Merges the state of the current key for the given source namespaces into the state of
+ * the target namespace.
+ *
+ * @param target The target namespace where the merged state should be stored.
+ * @param sources The source namespaces whose state should be merged.
+ *
+ * @throws Exception The method may forward exception thrown internally (by I/O or functions).
+ */
+ void mergeNamespaces(N target, Collection<N> sources) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
new file mode 100644
index 0000000..40e625c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalReducingState.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.ReducingState;
+
+/**
+ * The peer to the {@link ReducingState} in the internal state type hierarchy.
+ *
+ * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
+ *
+ * @param <N> The type of the namespace
+ * @param <T> The type of elements in the list
+ */
+public interface InternalReducingState<N, T> extends InternalMergingState<N, T, T>, ReducingState<T> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
new file mode 100644
index 0000000..7177b8a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalValueState.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.internal;
+
+import org.apache.flink.api.common.state.ValueState;
+
+/**
+ * The peer to the {@link ValueState} in the internal state type hierarchy.
+ *
+ * <p>See {@link InternalKvState} for a description of the internal state hierarchy.
+ *
+ * @param <N> The type of the namespace
+ * @param <T> The type of elements in the list
+ */
+public interface InternalValueState<N, T> extends InternalKvState<N>, ValueState<T> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
index 86f8766..1c02ca1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
@@ -43,8 +43,8 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -593,7 +593,7 @@ public class KvStateClientTest {
state.update(201 + i);
// we know it must be a KvStat but this is not exposed to the user via State
- KvState<?> kvState = (KvState<?>) state;
+ InternalKvState<?> kvState = (InternalKvState<?>) state;
// Register KvState (one state instance for all server)
ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index b1ec86f..202024c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -40,9 +40,9 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -267,7 +267,7 @@ public class KvStateServerHandlerTest extends TestLogger {
}
/**
- * Tests the failure response on a failure on the {@link KvState#getSerializedValue(byte[])}
+ * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])}
* call.
*/
@Test
@@ -279,7 +279,7 @@ public class KvStateServerHandlerTest extends TestLogger {
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
// Failing KvState
- KvState<?> kvState = mock(KvState.class);
+ InternalKvState<?> kvState = mock(InternalKvState.class);
when(kvState.getSerializedValue(any(byte[].class)))
.thenThrow(new RuntimeException("Expected test Exception"));
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index 0d9c2e4..69dbe6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.query.netty.message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
-import org.apache.flink.api.common.state.ListState;
+
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -30,10 +30,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+
import org.junit.Test;
import java.io.IOException;
@@ -315,7 +317,7 @@ public class KvStateRequestSerializerTest {
*/
@Test
public void testListSerialization() throws Exception {
- final long key = 0l;
+ final long key = 0L;
// objects for heap state list serialisation
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
@@ -327,9 +329,10 @@ public class KvStateRequestSerializerTest {
);
longHeapKeyedStateBackend.setCurrentKey(key);
- final ListState<Long> listState = longHeapKeyedStateBackend
- .createListState(VoidNamespaceSerializer.INSTANCE,
+ final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend.createListState(
+ VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+
testListSerialization(key, listState);
}
@@ -340,19 +343,16 @@ public class KvStateRequestSerializerTest {
* @param key
* key of the list state
* @param listState
- * list state using the {@link VoidNamespace}, must also be a {@link
- * KvState} instance
+ * list state using the {@link VoidNamespace}, must also be a {@link InternalKvState} instance
*
* @throws Exception
*/
- public static void testListSerialization(final long key,
- final ListState<Long> listState) throws Exception {
+ public static void testListSerialization(
+ final long key,
+ final InternalListState<VoidNamespace, Long> listState) throws Exception {
TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
-
- final KvState<VoidNamespace> listKvState =
- (KvState<VoidNamespace>) listState;
- listKvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+ listState.setCurrentNamespace(VoidNamespace.INSTANCE);
// List
final int numElements = 10;
@@ -368,8 +368,8 @@ public class KvStateRequestSerializerTest {
KvStateRequestSerializer.serializeKeyAndNamespace(
key, LongSerializer.INSTANCE,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
- final byte[] serializedValues =
- listKvState.getSerializedValue(serializedKey);
+
+ final byte[] serializedValues = listState.getSerializedValue(serializedKey);
List<Long> actualValues = KvStateRequestSerializer.deserializeList(serializedValues, valueSerializer);
assertEquals(expectedValues, actualValues);
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 38e04aa..c560ab0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.heap.AbstractHeapState;
import org.apache.flink.runtime.state.heap.StateTable;
+import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -167,7 +168,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+ InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
// some modifications to the state
backend.setCurrentKey(1);
@@ -214,7 +215,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ValueState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- KvState<VoidNamespace> restoredKvState1 = (KvState<VoidNamespace>) restored1;
+ InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
backend.setCurrentKey(1);
assertEquals("1", restored1.value());
@@ -230,7 +231,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ValueState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- KvState<VoidNamespace> restoredKvState2 = (KvState<VoidNamespace>) restored2;
+ InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
backend.setCurrentKey(1);
assertEquals("u1", restored2.value());
@@ -246,7 +247,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
/**
- * Tests {@link ValueState#value()} and {@link KvState#getSerializedValue(byte[])}
+ * Tests {@link ValueState#value()} and {@link InternalKvState#getSerializedValue(byte[])}
* accessing the state concurrently. They should not get in the way of each
* other.
*/
@@ -255,7 +256,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
public void testValueStateRace() throws Exception {
final AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE);
- final Integer namespace = Integer.valueOf(1);
+ final Integer namespace = 1;
final ValueStateDescriptor<String> kvId =
new ValueStateDescriptor<>("id", String.class);
@@ -270,7 +271,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
.getPartitionedState(namespace, IntSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- final KvState<Integer> kvState = (KvState<Integer>) state;
+ final InternalKvState<Integer> kvState = (InternalKvState<Integer>) state;
/**
* 1) Test that ValueState#value() before and after
@@ -496,7 +497,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+ InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
Joiner joiner = Joiner.on(",");
// some modifications to the state
@@ -544,7 +545,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ListState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- KvState<VoidNamespace> restoredKvState1 = (KvState<VoidNamespace>) restored1;
+ InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
backend.setCurrentKey(1);
assertEquals("1", joiner.join(restored1.get()));
@@ -560,7 +561,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ListState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- KvState<VoidNamespace> restoredKvState2 = (KvState<VoidNamespace>) restored2;
+ InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
backend.setCurrentKey(1);
assertEquals("1,u1", joiner.join(restored2.get()));
@@ -596,7 +597,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ReducingState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+ InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
// some modifications to the state
backend.setCurrentKey(1);
@@ -643,7 +644,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ReducingState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- KvState<VoidNamespace> restoredKvState1 = (KvState<VoidNamespace>) restored1;
+ InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
backend.setCurrentKey(1);
assertEquals("1", restored1.get());
@@ -659,7 +660,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
ReducingState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- KvState<VoidNamespace> restoredKvState2 = (KvState<VoidNamespace>) restored2;
+ InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
backend.setCurrentKey(1);
assertEquals("1,u1", restored2.get());
@@ -698,7 +699,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
FoldingState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+ InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
// some modifications to the state
backend.setCurrentKey(1);
@@ -746,7 +747,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
FoldingState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- KvState<VoidNamespace> restoredKvState1 = (KvState<VoidNamespace>) restored1;
+ InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
backend.setCurrentKey(1);
assertEquals("Fold-Initial:,1", restored1.get());
@@ -763,7 +764,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
@SuppressWarnings("unchecked")
FoldingState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
- KvState<VoidNamespace> restoredKvState2 = (KvState<VoidNamespace>) restored2;
+ InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
backend.setCurrentKey(1);
assertEquals("Fold-Initial:,101", restored2.get());
@@ -1254,7 +1255,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
VoidNamespaceSerializer.INSTANCE,
desc);
- KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+ InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
assertTrue(kvState instanceof AbstractHeapState);
kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -1280,7 +1281,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
VoidNamespaceSerializer.INSTANCE,
desc);
- KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+ InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
assertTrue(kvState instanceof AbstractHeapState);
kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -1311,7 +1312,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
VoidNamespaceSerializer.INSTANCE,
desc);
- KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+ InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
assertTrue(kvState instanceof AbstractHeapState);
kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -1342,7 +1343,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
VoidNamespaceSerializer.INSTANCE,
desc);
- KvState<VoidNamespace> kvState = (KvState<VoidNamespace>) state;
+ InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
assertTrue(kvState instanceof AbstractHeapState);
kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -1451,7 +1452,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
* if it is not null.
*/
private static <V, K, N> V getSerializedValue(
- KvState<N> kvState,
+ InternalKvState<N> kvState,
K key,
TypeSerializer<K> keySerializer,
N namespace,
@@ -1475,7 +1476,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
* if it is not null.
*/
private static <V, K, N> List<V> getSerializedList(
- KvState<N> kvState,
+ InternalKvState<N> kvState,
K key,
TypeSerializer<K> keySerializer,
N namespace,
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
new file mode 100644
index 0000000..33d60a0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the simple Java heap objects implementation of the {@link ListState}.
+ */
+public class HeapListStateTest {
+
+ @Test
+ public void testAddAndGet() throws Exception {
+
+ final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
+ stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
+
+ try {
+ InternalListState<VoidNamespace, Long> state =
+ keyedBackend.createListState(VoidNamespaceSerializer.INSTANCE, stateDescr);
+ state.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+ state.add(17L);
+ state.add(11L);
+ assertEquals(asList(17L, 11L), state.get());
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertNull(state.get());
+ state.add(1L);
+ state.add(2L);
+
+ keyedBackend.setCurrentKey("def");
+ assertEquals(asList(17L, 11L), state.get());
+ state.clear();
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ state.add(3L);
+ state.add(2L);
+ state.add(1L);
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertEquals(asList(1L, 2L, 3L, 2L, 1L), state.get());
+ state.clear();
+
+ // make sure all lists / maps are cleared
+
+ StateTable<String, VoidNamespace, ArrayList<Long>> stateTable =
+ ((HeapListState<String, VoidNamespace, Long>) state).stateTable;
+
+ assertTrue(stateTable.isEmpty());
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ @Test
+ public void testMerging() throws Exception {
+
+ final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
+ stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ final Integer namespace1 = 1;
+ final Integer namespace2 = 2;
+ final Integer namespace3 = 3;
+
+ final Set<Long> expectedResult = new HashSet<>(asList(11L, 22L, 33L, 44L, 55L));
+
+ final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
+
+ try {
+ InternalListState<Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
+
+ // populate the different namespaces
+ // - abc spreads the values over three namespaces
+ // - def spreads teh values over two namespaces (one empty)
+ // - ghi is empty
+ // - jkl has all elements already in the target namespace
+ // - mno has all elements already in one source namespace
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.add(33L);
+ state.add(55L);
+
+ state.setCurrentNamespace(namespace2);
+ state.add(22L);
+ state.add(11L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(44L);
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(44L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(22L);
+ state.add(55L);
+ state.add(33L);
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace3);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("abc");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ validateResult(state.get(), expectedResult);
+
+ keyedBackend.setCurrentKey("def");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ validateResult(state.get(), expectedResult);
+
+ keyedBackend.setCurrentKey("ghi");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("jkl");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ validateResult(state.get(), expectedResult);
+
+ keyedBackend.setCurrentKey("mno");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ validateResult(state.get(), expectedResult);
+
+ // make sure all lists / maps are cleared
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("ghi");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ StateTable<String, Integer, ArrayList<Long>> stateTable =
+ ((HeapListState<String, Integer, Long>) state).stateTable;
+
+ assertTrue(stateTable.isEmpty());
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception {
+ return new HeapKeyedStateBackend<>(
+ mock(TaskKvStateRegistry.class),
+ StringSerializer.INSTANCE,
+ HeapListStateTest.class.getClassLoader(),
+ 16,
+ new KeyGroupRange(0, 15));
+ }
+
+ private static <T> void validateResult(Iterable<T> values, Set<T> expected) {
+ int num = 0;
+ for (T v : values) {
+ num++;
+ assertTrue(expected.contains(v));
+ }
+
+ assertEquals(expected.size(), num);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
new file mode 100644
index 0000000..e0929f1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+
+import org.junit.Test;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the simple Java heap objects implementation of the {@link ReducingState}.
+ */
+public class HeapReducingStateTest {
+
+ @Test
+ public void testAddAndGet() throws Exception {
+
+ final ReducingStateDescriptor<Long> stateDescr =
+ new ReducingStateDescriptor<>("my-state", new AddingFunction(), Long.class);
+ stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
+
+ try {
+ InternalReducingState<VoidNamespace, Long> state =
+ keyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, stateDescr);
+ state.setCurrentNamespace(VoidNamespace.INSTANCE);
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+ state.add(17L);
+ state.add(11L);
+ assertEquals(28L, state.get().longValue());
+
+ keyedBackend.setCurrentKey("abc");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertNull(state.get());
+ state.add(1L);
+ state.add(2L);
+
+ keyedBackend.setCurrentKey("def");
+ assertEquals(28L, state.get().longValue());
+ state.clear();
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ state.add(3L);
+ state.add(2L);
+ state.add(1L);
+
+ keyedBackend.setCurrentKey("def");
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("g");
+ assertEquals(9L, state.get().longValue());
+ state.clear();
+
+ // make sure all lists / maps are cleared
+
+ StateTable<String, VoidNamespace, Long> stateTable =
+ ((HeapReducingState<String, VoidNamespace, Long>) state).stateTable;
+
+ assertTrue(stateTable.isEmpty());
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ @Test
+ public void testMerging() throws Exception {
+
+ final ReducingStateDescriptor<Long> stateDescr = new ReducingStateDescriptor<>(
+ "my-state", new AddingFunction(), Long.class);
+ stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ final Integer namespace1 = 1;
+ final Integer namespace2 = 2;
+ final Integer namespace3 = 3;
+
+ final Long expectedResult = 165L;
+
+ final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend();
+
+ try {
+ final InternalReducingState<Integer, Long> state =
+ keyedBackend.createReducingState(IntSerializer.INSTANCE, stateDescr);
+
+ // populate the different namespaces
+ // - abc spreads the values over three namespaces
+ // - def spreads teh values over two namespaces (one empty)
+ // - ghi is empty
+ // - jkl has all elements already in the target namespace
+ // - mno has all elements already in one source namespace
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.add(33L);
+ state.add(55L);
+
+ state.setCurrentNamespace(namespace2);
+ state.add(22L);
+ state.add(11L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(44L);
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(44L);
+
+ state.setCurrentNamespace(namespace3);
+ state.add(22L);
+ state.add(55L);
+ state.add(33L);
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace3);
+ state.add(11L);
+ state.add(22L);
+ state.add(33L);
+ state.add(44L);
+ state.add(55L);
+
+ keyedBackend.setCurrentKey("abc");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("def");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("ghi");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertNull(state.get());
+
+ keyedBackend.setCurrentKey("jkl");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ keyedBackend.setCurrentKey("mno");
+ state.mergeNamespaces(namespace1, asList(namespace2, namespace3));
+ state.setCurrentNamespace(namespace1);
+ assertEquals(expectedResult, state.get());
+
+ // make sure all lists / maps are cleared
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("ghi");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace1);
+ state.clear();
+
+ StateTable<String, Integer, Long> stateTable =
+ ((HeapReducingState<String, Integer, Long>) state).stateTable;
+
+ assertTrue(stateTable.isEmpty());
+ }
+ finally {
+ keyedBackend.close();
+ keyedBackend.dispose();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception {
+ return new HeapKeyedStateBackend<>(
+ mock(TaskKvStateRegistry.class),
+ StringSerializer.INSTANCE,
+ HeapReducingStateTest.class.getClassLoader(),
+ 16,
+ new KeyGroupRange(0, 15));
+ }
+
+ // ------------------------------------------------------------------------
+ // test functions
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("serial")
+ private static class AddingFunction implements ReduceFunction<Long> {
+
+ @Override
+ public Long reduce(Long a, Long b) {
+ return a + b;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a21660c..6bb0a40 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -494,15 +494,34 @@ public abstract class AbstractStreamOperator<OUT>
return getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
}
+
+ protected <N, S extends State, T> S getOrCreateKeyedState(
+ TypeSerializer<N> namespaceSerializer,
+ StateDescriptor<S, T> stateDescriptor) throws Exception {
+
+ if (keyedStateStore != null) {
+ return keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
+ }
+ else {
+ throw new IllegalStateException("Cannot create partitioned state. " +
+ "The keyed state backend has not been set." +
+ "This indicates that the operator is not partitioned/keyed.");
+ }
+ }
+
/**
* Creates a partitioned state handle, using the state backend configured for this task.
+ *
+ * TODO: NOTE: This method does a lot of work caching / retrieving states just to update the namespace.
+ * This method should be removed for the sake of namespaces being lazily fetched from the keyed
+ * state backend, or being set on the state directly.
*
* @throws IllegalStateException Thrown, if the key/value state was already initialized.
* @throws Exception Thrown, if the state backend cannot create the key/value state.
*/
- @SuppressWarnings("unchecked")
protected <S extends State, N> S getPartitionedState(
- N namespace, TypeSerializer<N> namespaceSerializer,
+ N namespace,
+ TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, ?> stateDescriptor) throws Exception {
if (keyedStateStore != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 0d5d091..05c89dd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -35,7 +35,7 @@ import java.util.Set;
/**
* A {@link Window} that represents a time interval from {@code start} (inclusive) to
- * {@code start + size} (exclusive).
+ * {@code end} (exclusive).
*/
@PublicEvolving
public class TimeWindow extends Window {
@@ -48,14 +48,35 @@ public class TimeWindow extends Window {
this.end = end;
}
+ /**
+ * Gets the starting timestamp of the window. This is the first timestamp that belongs
+ * to this window.
+ *
+ * @return The starting timestamp of this window.
+ */
public long getStart() {
return start;
}
+ /**
+ * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it
+ * is the first timestamp that does not belong to this window any more.
+ *
+ * @return The exclusive end timestamp of this window.
+ */
public long getEnd() {
return end;
}
+ /**
+ * Gets the largest timestamp that still belongs to this window.
+ *
+ * <p>This timestamp is identical to {@code getEnd() - 1}.
+ *
+ * @return The largest timestamp that still belongs to this window.
+ *
+ * @see #getEnd()
+ */
@Override
public long maxTimestamp() {
return end - 1;
@@ -104,6 +125,13 @@ public class TimeWindow extends Window {
return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));
}
+ // ------------------------------------------------------------------------
+ // Serializer
+ // ------------------------------------------------------------------------
+
+ /**
+ * The serializer used to write the TimeWindow type.
+ */
public static class Serializer extends TypeSerializer<TimeWindow> {
private static final long serialVersionUID = 1L;
@@ -152,9 +180,7 @@ public class TimeWindow extends Window {
@Override
public TimeWindow deserialize(TimeWindow reuse, DataInputView source) throws IOException {
- long start = source.readLong();
- long end = source.readLong();
- return new TimeWindow(start, end);
+ return deserialize(source);
}
@Override
@@ -179,6 +205,10 @@ public class TimeWindow extends Window {
}
}
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
/**
* Merge overlapping {@link TimeWindow}s. For use by merging
* {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}.
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
index 0e131ff..b17989c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/Window.java
@@ -31,5 +31,10 @@ import org.apache.flink.annotation.PublicEvolving;
@PublicEvolving
public abstract class Window {
+ /**
+ * Gets the largest timestamp that still belongs to this window.
+ *
+ * @return The largest timestamp that still belongs to this window.
+ */
public abstract long maxTimestamp();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b97128f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 8c73878..d9c977a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -20,15 +20,16 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
+
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.AppendingState;
-import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -41,7 +42,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.Collection;
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A {@link WindowOperator} that also allows an {@link Evictor} to be used.
@@ -56,40 +57,52 @@ import static java.util.Objects.requireNonNull;
* @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
*/
@Internal
-public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
+public class EvictingWindowOperator<K, IN, OUT, W extends Window>
+ extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
private static final long serialVersionUID = 1L;
+ // ------------------------------------------------------------------------
+ // these fields are set by the API stream graph builder to configure the operator
+
private final Evictor<? super IN, ? super W> evictor;
+ private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> evictingWindowStateDescriptor;
+
+ // ------------------------------------------------------------------------
+ // the fields below are instantiated once the operator runs in the runtime
+
private transient EvictorContext evictorContext;
- private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor;
+ private transient InternalListState<W, StreamRecord<IN>> evictingWindowState;
+
+ // ------------------------------------------------------------------------
public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner,
- TypeSerializer<W> windowSerializer,
- KeySelector<IN, K> keySelector,
- TypeSerializer<K> keySerializer,
- StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor,
- InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
- Trigger<? super IN, ? super W> trigger,
- Evictor<? super IN, ? super W> evictor,
- long allowedLateness) {
+ TypeSerializer<W> windowSerializer,
+ KeySelector<IN, K> keySelector,
+ TypeSerializer<K> keySerializer,
+ StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor,
+ InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
+ Trigger<? super IN, ? super W> trigger,
+ Evictor<? super IN, ? super W> evictor,
+ long allowedLateness) {
super(windowAssigner, windowSerializer, keySelector,
keySerializer, null, windowFunction, trigger, allowedLateness);
- this.evictor = requireNonNull(evictor);
- this.windowStateDescriptor = windowStateDescriptor;
+
+ this.evictor = checkNotNull(evictor);
+ this.evictingWindowStateDescriptor = checkNotNull(windowStateDescriptor);
}
@Override
- @SuppressWarnings("unchecked")
public void processElement(StreamRecord<IN> element) throws Exception {
Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(),
element.getTimestamp(),
windowAssignerContext);
+ @SuppressWarnings("unchecked")
final K key = (K) getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
@@ -119,11 +132,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
}
// merge the merged state windows into the newly resulting state window
- getKeyedStateBackend().mergePartitionedStates(
- stateWindowResult,
- mergedStateWindows,
- windowSerializer,
- (StateDescriptor<? extends MergingState<?, ?>, ?>) windowStateDescriptor);
+ evictingWindowState.mergeNamespaces(stateWindowResult, mergedStateWindows);
}
});
@@ -137,9 +146,9 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
if (stateWindow == null) {
throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
}
- ListState<StreamRecord<IN>> windowState = getPartitionedState(
- stateWindow, windowSerializer, windowStateDescriptor);
- windowState.add(element);
+
+ evictingWindowState.setCurrentNamespace(stateWindow);
+ evictingWindowState.add(element);
context.key = key;
context.window = actualWindow;
@@ -149,16 +158,16 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
TriggerResult triggerResult = context.onElement(element);
if (triggerResult.isFire()) {
- Iterable<StreamRecord<IN>> contents = windowState.get();
+ Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}
- fire(actualWindow, contents, windowState);
+ fire(actualWindow, contents, evictingWindowState);
}
if (triggerResult.isPurge()) {
- cleanup(actualWindow, windowState, mergingWindows);
+ cleanup(actualWindow, evictingWindowState, mergingWindows);
} else {
registerCleanupTimer(actualWindow);
}
@@ -173,9 +182,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
continue;
}
- ListState<StreamRecord<IN>> windowState = getPartitionedState(
- window, windowSerializer, windowStateDescriptor);
- windowState.add(element);
+ evictingWindowState.setCurrentNamespace(window);
+ evictingWindowState.add(element);
context.key = key;
context.window = window;
@@ -185,16 +193,16 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
TriggerResult triggerResult = context.onElement(element);
if (triggerResult.isFire()) {
- Iterable<StreamRecord<IN>> contents = windowState.get();
+ Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}
- fire(window, contents, windowState);
+ fire(window, contents, evictingWindowState);
}
if (triggerResult.isPurge()) {
- cleanup(window, windowState, null);
+ cleanup(window, evictingWindowState, null);
} else {
registerCleanupTimer(window);
}
@@ -222,15 +230,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
// so it is safe to just ignore
return;
}
- windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+
+ evictingWindowState.setCurrentNamespace(stateWindow);
} else {
- windowState = getPartitionedState(
- context.window,
- windowSerializer,
- windowStateDescriptor);
+ evictingWindowState.setCurrentNamespace(context.window);
}
- Iterable<StreamRecord<IN>> contents = windowState.get();
+ Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
return;
@@ -238,11 +244,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
if (triggerResult.isFire()) {
- fire(context.window, contents, windowState);
+ fire(context.window, contents, evictingWindowState);
}
if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
- cleanup(context.window, windowState, mergingWindows);
+ cleanup(context.window, evictingWindowState, mergingWindows);
}
}
@@ -265,12 +271,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
// so it is safe to just ignore
return;
}
- windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ evictingWindowState.setCurrentNamespace(stateWindow);
} else {
- windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+ evictingWindowState.setCurrentNamespace(context.window);
}
- Iterable<StreamRecord<IN>> contents = windowState.get();
+ Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
return;
@@ -278,11 +284,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
if (triggerResult.isFire()) {
- fire(context.window, contents, windowState);
+ fire(context.window, contents, evictingWindowState);
}
if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
- cleanup(context.window, windowState, mergingWindows);
+ cleanup(context.window, evictingWindowState, mergingWindows);
}
}
@@ -381,7 +387,10 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
@Override
public void open() throws Exception {
super.open();
+
evictorContext = new EvictorContext(null,null);
+ evictingWindowState = (InternalListState<W, StreamRecord<IN>>)
+ getOrCreateKeyedState(windowSerializer, evictingWindowStateDescriptor);
}
@Override
@@ -409,6 +418,6 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
@VisibleForTesting
@SuppressWarnings("unchecked, rawtypes")
public StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?> getStateDescriptor() {
- return (StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?>) windowStateDescriptor;
+ return (StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?>) evictingWindowStateDescriptor;
}
}