You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:32 UTC
[05/24] flink git commit: [FLINK-2808] [streaming] Refactor and
extend state backend abstraction
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
new file mode 100644
index 0000000..7b8dbd5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.state.StateHandle;
+
+/**
+ * List of task states for a chain of streaming tasks.
+ */
+public class StreamTaskStateList implements StateHandle<StreamTaskState[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ /** The states for all operator */
+ private final StreamTaskState[] states;
+
+
+ public StreamTaskStateList(StreamTaskState[] states) {
+ this.states = states;
+ }
+
+ public boolean isEmpty() {
+ for (StreamTaskState state : states) {
+ if (state != null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public StreamTaskState[] getState(ClassLoader userCodeClassLoader) {
+ return states;
+ }
+
+ @Override
+ public void discardState() throws Exception {
+ for (StreamTaskState state : states) {
+ if (state != null) {
+ state.discardState();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
deleted file mode 100644
index a8c4b49..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.state.StateHandleProvider;
-import org.apache.flink.streaming.api.state.PartitionedStreamOperatorState;
-import org.apache.flink.streaming.api.state.StreamOperatorState;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Implementation of the {@link RuntimeContext}, created by runtime stream UDF
- * operators.
- */
-public class StreamingRuntimeContext extends RuntimeUDFContext {
-
- private final Environment env;
- private final Map<String, StreamOperatorState<?, ?>> states;
- private final List<PartitionedStreamOperatorState<?, ?, ?>> partitionedStates;
- private final KeySelector<?, ?> statePartitioner;
- private final StateHandleProvider<Serializable> provider;
-
- /**
- * We need access to the {@link StreamTask} to register timer callbacks.
- */
- private final StreamTask<?, ?> streamTask;
-
- @SuppressWarnings("unchecked")
- public StreamingRuntimeContext(
- Environment env,
- ExecutionConfig executionConfig,
- KeySelector<?, ?> statePartitioner,
- StateHandleProvider<?> provider,
- Map<String, Accumulator<?, ?>> accumulatorMap,
- StreamTask<?, ?> streamTask) {
-
- super(env.getTaskName(), env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(),
- env.getUserClassLoader(), executionConfig,
- env.getDistributedCacheEntries(), accumulatorMap);
-
- this.env = env;
- this.statePartitioner = statePartitioner;
- this.states = new HashMap<>();
- this.partitionedStates = new LinkedList<>();
- this.provider = (StateHandleProvider<Serializable>) provider;
- this.streamTask = streamTask;
- }
-
- /**
- * Returns the input split provider associated with the operator.
- *
- * @return The input split provider.
- */
- public InputSplitProvider getInputSplitProvider() {
- return env.getInputSplitProvider();
- }
-
- /**
- * Returns the stub parameters associated with the {@link TaskConfig} of the
- * operator.
- *
- * @return The stub parameters.
- */
- public Configuration getTaskStubParameters() {
- return new TaskConfig(env.getTaskConfiguration()).getStubParameters();
- }
-
- public StateHandleProvider<Serializable> getStateHandleProvider() {
- return provider;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <S, C extends Serializable> OperatorState<S> getOperatorState(String name,
- S defaultState, boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException {
- if (defaultState == null) {
- throw new RuntimeException("Cannot set default state to null.");
- }
- StreamOperatorState<S, C> state = (StreamOperatorState<S, C>) getState(name, partitioned);
- state.setCheckpointer(checkpointer);
- state.setDefaultState(defaultState);
-
- return (OperatorState<S>) state;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState,
- boolean partitioned) throws IOException {
- if (defaultState == null) {
- throw new RuntimeException("Cannot set default state to null.");
- }
- StreamOperatorState<S, S> state = (StreamOperatorState<S, S>) getState(name, partitioned);
- state.setDefaultState(defaultState);
-
- return (OperatorState<S>) state;
- }
-
- public StreamOperatorState<?, ?> getState(String name, boolean partitioned) {
- // Try fetching state from the map
- StreamOperatorState<?, ?> state = states.get(name);
- if (state == null) {
- // If not found, create empty state and add to the map
- state = createRawState(partitioned);
- states.put(name, state);
- // We keep a reference to all partitioned states for registering input
- if (state instanceof PartitionedStreamOperatorState) {
- partitionedStates.add((PartitionedStreamOperatorState<?, ?, ?>) state);
- }
- }
- return state;
- }
-
- /**
- * Creates an empty {@link OperatorState}.
- *
- * @return An empty operator state.
- */
- @SuppressWarnings({"rawtypes", "unchecked"})
- public StreamOperatorState<?, ?> createRawState(boolean partitioned) {
- if (partitioned) {
- if (statePartitioner != null) {
- return new PartitionedStreamOperatorState(provider, statePartitioner, getUserCodeClassLoader());
- } else {
- throw new RuntimeException(
- "Partitioned state can only be used with KeyedStreams.");
- }
- } else {
- return new StreamOperatorState(provider);
- }
- }
-
- /**
- * Provides access to the all the states contained in the context
- *
- * @return All the states for the underlying operator.
- */
- public Map<String, StreamOperatorState<?, ?>> getOperatorStates() {
- return states;
- }
-
- /**
- * Register a timer callback. At the specified time the
- * {@code Triggerable } will be invoked. This call is guaranteed to not happen
- * concurrently with method calls on the operator.
- *
- * @param time The absolute time in milliseconds.
- * @param target The target to be triggered.
- */
- public void registerTimer(long time, Triggerable target) {
- streamTask.registerTimer(time, target);
- }
-
- /**
- * Sets the next input of the underlying operators, used to access
- * partitioned states.
- *
- * @param nextRecord
- * Next input of the operator.
- */
- @SuppressWarnings({"unchecked", "rawtypes"})
- public void setNextInput(StreamRecord<?> nextRecord) {
- if (statePartitioner != null) {
- for (PartitionedStreamOperatorState state : partitionedStates) {
- state.setCurrentInput(nextRecord.getValue());
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 25f1a76..d2d8a2e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
@@ -35,6 +36,9 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
@Override
public void init() throws Exception {
+ StreamConfig configuration = getConfiguration();
+ ClassLoader userClassLoader = getUserCodeClassLoader();
+
TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
@@ -75,10 +79,13 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
@Override
protected void run() throws Exception {
- while (running && inputProcessor.processInput(streamOperator, lock)) {
- if (timerException != null) {
- throw timerException;
- }
+ // cache some references on the stack, to make the code more JIT friendly
+ final TwoInputStreamOperator<IN1, IN2, OUT> operator = this.headOperator;
+ final StreamTwoInputProcessor<IN1, IN2> inputProcessor = this.inputProcessor;
+ final Object lock = getCheckpointLock();
+
+ while (running && inputProcessor.processInput(operator, lock)) {
+ checkTimerException();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index e002780..aeb5078 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -74,8 +74,7 @@ public class AggregationFunctionTest {
}
// some necessary boiler plate
- TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor
- .getForObject(new Tuple2<>(0, 0));
+ TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor.getForObject(new Tuple2<>(0, 0));
ExecutionConfig config = new ExecutionConfig();
@@ -92,15 +91,15 @@ public class AggregationFunctionTest {
1, typeInfo, AggregationType.MAX, config);
List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
- new StreamGroupedReduce<>(sumFunction, keySelector, typeInfo),
+ new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
getInputList());
List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute(
- new StreamGroupedReduce<>(minFunction, keySelector, typeInfo),
+ new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
getInputList());
List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute(
- new StreamGroupedReduce<>(maxFunction, keySelector, typeInfo),
+ new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
getInputList());
assertEquals(expectedGroupSumList, groupedSumList);
@@ -156,13 +155,13 @@ public class AggregationFunctionTest {
false, config);
List<MyPojo> groupedSumList = MockContext.createAndExecute(
- new StreamGroupedReduce<>(sumFunction, keySelector, typeInfo),
+ new StreamGroupedReduce<>(sumFunction, typeInfo.createSerializer(config)),
getInputPojoList());
List<MyPojo> groupedMinList = MockContext.createAndExecute(
- new StreamGroupedReduce<>(minFunction, keySelector, typeInfo),
+ new StreamGroupedReduce<>(minFunction, typeInfo.createSerializer(config)),
getInputPojoList());
List<MyPojo> groupedMaxList = MockContext.createAndExecute(
- new StreamGroupedReduce<>(maxFunction, keySelector, typeInfo),
+ new StreamGroupedReduce<>(maxFunction, typeInfo.createSerializer(config)),
getInputPojoList());
assertEquals(expectedGroupSumList, groupedSumList);
@@ -216,16 +215,16 @@ public class AggregationFunctionTest {
new ComparableAggregator<>(1, typeInfo, AggregationType.MINBY, false, config);
assertEquals(maxByFirstExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(maxByFunctionFirst, keySelector, typeInfo),
+ new StreamGroupedReduce<>(maxByFunctionFirst, typeInfo.createSerializer(config)),
getInputByList()));
assertEquals(maxByLastExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(maxByFunctionLast, keySelector, typeInfo),
+ new StreamGroupedReduce<>(maxByFunctionLast, typeInfo.createSerializer(config)),
getInputByList()));
assertEquals(minByLastExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(minByFunctionLast, keySelector, typeInfo),
+ new StreamGroupedReduce<>(minByFunctionLast, typeInfo.createSerializer(config)),
getInputByList()));
assertEquals(minByFirstExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(minByFunctionFirst, keySelector, typeInfo),
+ new StreamGroupedReduce<>(minByFunctionFirst, typeInfo.createSerializer(config)),
getInputByList()));
}
@@ -274,16 +273,16 @@ public class AggregationFunctionTest {
new ComparableAggregator<>("f1", typeInfo, AggregationType.MINBY, false, config);
assertEquals(maxByFirstExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(maxByFunctionFirst, keySelector, typeInfo),
+ new StreamGroupedReduce<>(maxByFunctionFirst, typeInfo.createSerializer(config)),
getInputByPojoList()));
assertEquals(maxByLastExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(maxByFunctionLast, keySelector, typeInfo),
+ new StreamGroupedReduce<>(maxByFunctionLast, typeInfo.createSerializer(config)),
getInputByPojoList()));
assertEquals(minByLastExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(minByFunctionLast, keySelector, typeInfo),
+ new StreamGroupedReduce<>(minByFunctionLast, typeInfo.createSerializer(config)),
getInputByPojoList()));
assertEquals(minByFirstExpected, MockContext.createAndExecute(
- new StreamGroupedReduce<>(minByFunctionFirst, keySelector, typeInfo),
+ new StreamGroupedReduce<>(minByFunctionFirst, typeInfo.createSerializer(config)),
getInputByPojoList()));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 3a224e4..c23a4f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -538,7 +539,9 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
DataStreamSink<Long> sink2 = env.generateSequence(1, 100).keyBy(key1).print();
- assertTrue(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStatePartitioner() != null);
+ assertNotNull(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStatePartitioner());
+ assertNotNull(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStateKeySerializer());
+ assertNotNull(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStateKeySerializer());
assertEquals(key1, env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getStatePartitioner());
assertTrue(env.getStreamGraph().getStreamNode(sink2.getTransformation().getId()).getInEdges().get(0).getPartitioner() instanceof HashPartitioner);
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
index ac23cda..f7c6e53 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.functions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.junit.After;
import org.junit.Assert;
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 2246ffd..c316604 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -35,7 +35,8 @@ import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.EvenOddOutputSelector;
import org.apache.flink.streaming.util.NoOpIntMap;
import org.apache.flink.streaming.util.NoOpSink;
@@ -262,19 +263,13 @@ public class StreamGraphGeneratorTest extends StreamingMultipleProgramsTestBase
}
@Override
- public void processWatermark1(Watermark mark) throws Exception {
-
- }
+ public void processWatermark1(Watermark mark) throws Exception {}
@Override
- public void processWatermark2(Watermark mark) throws Exception {
-
- }
+ public void processWatermark2(Watermark mark) throws Exception {}
@Override
- public void setup(Output output, StreamingRuntimeContext runtimeContext) {
-
- }
+ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Integer>> output) {}
}
private static class OutputTypeConfigurableOperationWithOneInput
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
index bc5d614..1002b10 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
@@ -43,6 +43,7 @@ import org.junit.Test;
* <li>Watermarks are correctly forwarded</li>
* </ul>
*/
+@SuppressWarnings("serial")
public class StreamGroupedFoldTest {
private static class MyFolder implements FoldFunction<Integer, String> {
@@ -60,20 +61,17 @@ public class StreamGroupedFoldTest {
private TypeInformation<String> outType = TypeExtractor.getForClass(String.class);
@Test
- @SuppressWarnings("unchecked")
public void testGroupedFold() throws Exception {
- StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<>(
- new MyFolder(), new KeySelector<Integer, String>() {
-
- private static final long serialVersionUID = 1L;
-
+ KeySelector<Integer, String> keySelector = new KeySelector<Integer, String>() {
+
@Override
- public String getKey(Integer value) throws Exception {
+ public String getKey(Integer value) {
return value.toString();
}
- }, "100", inType);
-
+ };
+
+ StreamGroupedFold<Integer, String, String> operator = new StreamGroupedFold<>(new MyFolder(), "100");
operator.setOutputType(outType, new ExecutionConfig());
OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
@@ -102,15 +100,15 @@ public class StreamGroupedFoldTest {
@Test
public void testOpenClose() throws Exception {
- StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<>(new TestOpenCloseFoldFunction(), new KeySelector<Integer, Integer>() {
- private static final long serialVersionUID = 1L;
-
+ KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
@Override
- public Integer getKey(Integer value) throws Exception {
+ public Integer getKey(Integer value) {
return value;
}
- }, "init", inType);
-
+ };
+
+ StreamGroupedFold<Integer, String, Integer> operator = new StreamGroupedFold<>(
+ new TestOpenCloseFoldFunction(), "init");
operator.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
index 85d9bc1..b5d2bd6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
@@ -17,14 +17,14 @@
package org.apache.flink.streaming.api.operators;
-
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -46,9 +46,11 @@ import org.junit.Test;
public class StreamGroupedReduceTest {
@Test
- @SuppressWarnings("unchecked")
public void testGroupedReduce() throws Exception {
- StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<>(new MyReducer(), new IntegerKeySelector(), typeInfo);
+
+ KeySelector<Integer, Integer> keySelector = new IntegerKeySelector();
+
+ StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<>(new MyReducer(), IntSerializer.INSTANCE);
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
@@ -76,8 +78,11 @@ public class StreamGroupedReduceTest {
@Test
public void testOpenClose() throws Exception {
+
+ KeySelector<Integer, Integer> keySelector = new IntegerKeySelector();
+
StreamGroupedReduce<Integer> operator =
- new StreamGroupedReduce<>(new TestOpenCloseReduceFunction(), new IntegerKeySelector(), typeInfo);
+ new StreamGroupedReduce<>(new TestOpenCloseReduceFunction(), IntSerializer.INSTANCE);
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator);
long initialTime = 0L;
@@ -150,5 +155,5 @@ public class StreamGroupedReduceTest {
}
}
- private static TypeInformation<Integer> typeInfo = TypeExtractor.getForClass(Integer.class);
+ private static TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
new file mode 100644
index 0000000..73100d1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/FileStateBackendTest.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.state.filesystem.FileStreamStateHandle;
+import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.OperatingSystem;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+public class FileStateBackendTest {
+
+ @Test
+ public void testSetupAndSerialization() {
+ File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+ try {
+ final String backendDir = localFileUri(tempDir);
+ FsStateBackend originalBackend = new FsStateBackend(backendDir);
+
+ assertFalse(originalBackend.isInitialized());
+ assertEquals(new URI(backendDir), originalBackend.getBasePath().toUri());
+ assertNull(originalBackend.getCheckpointDirectory());
+
+ // serialize / copy the backend
+ FsStateBackend backend = CommonTestUtils.createCopySerializable(originalBackend);
+ assertFalse(backend.isInitialized());
+ assertEquals(new URI(backendDir), backend.getBasePath().toUri());
+ assertNull(backend.getCheckpointDirectory());
+
+ // no file operations should be possible right now
+ try {
+ backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis());
+ fail("should fail with an exception");
+ } catch (IllegalStateException e) {
+ // supreme!
+ }
+
+ backend.initializeForJob(new JobID());
+ assertNotNull(backend.getCheckpointDirectory());
+
+ File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+ assertTrue(checkpointDir.exists());
+ assertTrue(isDirectoryEmpty(checkpointDir));
+
+ backend.disposeAllStateForCurrentJob();
+ assertNull(backend.getCheckpointDirectory());
+
+ assertTrue(isDirectoryEmpty(tempDir));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ deleteDirectorySilently(tempDir);
+ }
+ }
+
+ @Test
+ public void testSerializableState() {
+ File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+ try {
+ FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+ backend.initializeForJob(new JobID());
+
+ File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+
+ String state1 = "dummy state";
+ String state2 = "row row row your boat";
+ Integer state3 = 42;
+
+ StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis());
+ StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis());
+ StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis());
+
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ assertEquals(state1, handle1.getState(getClass().getClassLoader()));
+ handle1.discardState();
+
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ assertEquals(state2, handle2.getState(getClass().getClassLoader()));
+ handle2.discardState();
+
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ assertEquals(state3, handle3.getState(getClass().getClassLoader()));
+ handle3.discardState();
+
+ assertTrue(isDirectoryEmpty(checkpointDir));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ deleteDirectorySilently(tempDir);
+ }
+ }
+
+ @Test
+ public void testStateOutputStream() {
+ File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+ try {
+ FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+ backend.initializeForJob(new JobID());
+
+ File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+
+ byte[] state1 = new byte[1274673];
+ byte[] state2 = new byte[1];
+ byte[] state3 = new byte[0];
+ byte[] state4 = new byte[177];
+
+ Random rnd = new Random();
+ rnd.nextBytes(state1);
+ rnd.nextBytes(state2);
+ rnd.nextBytes(state3);
+ rnd.nextBytes(state4);
+
+ long checkpointId = 97231523452L;
+
+ FsStateBackend.FsCheckpointStateOutputStream stream1 =
+ backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+ FsStateBackend.FsCheckpointStateOutputStream stream2 =
+ backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+ FsStateBackend.FsCheckpointStateOutputStream stream3 =
+ backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+
+ stream1.write(state1);
+ stream2.write(state2);
+ stream3.write(state3);
+
+ FileStreamStateHandle handle1 = stream1.closeAndGetHandle();
+ FileStreamStateHandle handle2 = stream2.closeAndGetHandle();
+ FileStreamStateHandle handle3 = stream3.closeAndGetHandle();
+
+ // use with try-with-resources
+ StreamStateHandle handle4;
+ try (StateBackend.CheckpointStateOutputStream stream4 =
+ backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) {
+ stream4.write(state4);
+ handle4 = stream4.closeAndGetHandle();
+ }
+
+ // close before accessing handle
+ StateBackend.CheckpointStateOutputStream stream5 =
+ backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+ stream5.write(state4);
+ stream5.close();
+ try {
+ stream5.closeAndGetHandle();
+ fail();
+ } catch (IOException e) {
+ // uh-huh
+ }
+
+ validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
+ handle1.discardState();
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ ensureLocalFileDeleted(handle1.getFilePath());
+
+ validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
+ handle2.discardState();
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ ensureLocalFileDeleted(handle2.getFilePath());
+
+ validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
+ handle3.discardState();
+ assertFalse(isDirectoryEmpty(checkpointDir));
+ ensureLocalFileDeleted(handle3.getFilePath());
+
+ validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
+ handle4.discardState();
+ assertTrue(isDirectoryEmpty(checkpointDir));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ deleteDirectorySilently(tempDir);
+ }
+ }
+
+ @Test
+ public void testKeyValueState() {
+ File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+ try {
+ FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+ backend.initializeForJob(new JobID());
+
+ File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+
+ KvState<Integer, String, FsStateBackend> kv =
+ backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
+ assertEquals(0, kv.size());
+
+ // some modifications to the state
+ kv.setCurrentKey(1);
+ assertNull(kv.value());
+ kv.update("1");
+ assertEquals(1, kv.size());
+ kv.setCurrentKey(2);
+ assertNull(kv.value());
+ kv.update("2");
+ assertEquals(2, kv.size());
+ kv.setCurrentKey(1);
+ assertEquals("1", kv.value());
+ assertEquals(2, kv.size());
+
+ // draw a snapshot
+ KvStateSnapshot<Integer, String, FsStateBackend> snapshot1 =
+ kv.shapshot(682375462378L, System.currentTimeMillis());
+
+ // make some more modifications
+ kv.setCurrentKey(1);
+ kv.update("u1");
+ kv.setCurrentKey(2);
+ kv.update("u2");
+ kv.setCurrentKey(3);
+ kv.update("u3");
+
+ // draw another snapshot
+ KvStateSnapshot<Integer, String, FsStateBackend> snapshot2 =
+ kv.shapshot(682375462379L, System.currentTimeMillis());
+
+ // validate the original state
+ assertEquals(3, kv.size());
+ kv.setCurrentKey(1);
+ assertEquals("u1", kv.value());
+ kv.setCurrentKey(2);
+ assertEquals("u2", kv.value());
+ kv.setCurrentKey(3);
+ assertEquals("u3", kv.value());
+
+ // restore the first snapshot and validate it
+ KvState<Integer, String, FsStateBackend> restored1 = snapshot1.restoreState(backend,
+ IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+ assertEquals(2, restored1.size());
+ restored1.setCurrentKey(1);
+ assertEquals("1", restored1.value());
+ restored1.setCurrentKey(2);
+ assertEquals("2", restored1.value());
+
+ // restore the first snapshot and validate it
+ KvState<Integer, String, FsStateBackend> restored2 = snapshot2.restoreState(backend,
+ IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+ assertEquals(3, restored2.size());
+ restored2.setCurrentKey(1);
+ assertEquals("u1", restored2.value());
+ restored2.setCurrentKey(2);
+ assertEquals("u2", restored2.value());
+ restored2.setCurrentKey(3);
+ assertEquals("u3", restored2.value());
+
+ snapshot1.discardState();
+ assertFalse(isDirectoryEmpty(checkpointDir));
+
+ snapshot2.discardState();
+ assertTrue(isDirectoryEmpty(checkpointDir));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ deleteDirectorySilently(tempDir);
+ }
+ }
+
+ @Test
+ public void testRestoreWithWrongSerializers() {
+ File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+ try {
+ FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+ backend.initializeForJob(new JobID());
+
+ File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath());
+
+ KvState<Integer, String, FsStateBackend> kv =
+ backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
+ kv.setCurrentKey(1);
+ kv.update("1");
+ kv.setCurrentKey(2);
+ kv.update("2");
+
+ KvStateSnapshot<Integer, String, FsStateBackend> snapshot =
+ kv.shapshot(682375462378L, System.currentTimeMillis());
+
+
+ @SuppressWarnings("unchecked")
+ TypeSerializer<Integer> fakeIntSerializer =
+ (TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
+
+ @SuppressWarnings("unchecked")
+ TypeSerializer<String> fakeStringSerializer =
+ (TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
+
+ try {
+ snapshot.restoreState(backend, fakeIntSerializer,
+ StringSerializer.INSTANCE, null, getClass().getClassLoader());
+ fail("should recognize wrong serializers");
+ } catch (IllegalArgumentException e) {
+ // expected
+ } catch (Exception e) {
+ fail("wrong exception");
+ }
+
+ try {
+ snapshot.restoreState(backend, IntSerializer.INSTANCE,
+ fakeStringSerializer, null, getClass().getClassLoader());
+ fail("should recognize wrong serializers");
+ } catch (IllegalArgumentException e) {
+ // expected
+ } catch (Exception e) {
+ fail("wrong exception");
+ }
+
+ try {
+ snapshot.restoreState(backend, fakeIntSerializer,
+ fakeStringSerializer, null, getClass().getClassLoader());
+ fail("should recognize wrong serializers");
+ } catch (IllegalArgumentException e) {
+ // expected
+ } catch (Exception e) {
+ fail("wrong exception");
+ }
+
+ snapshot.discardState();
+
+ assertTrue(isDirectoryEmpty(checkpointDir));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ deleteDirectorySilently(tempDir);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static void ensureLocalFileDeleted(Path path) {
+ URI uri = path.toUri();
+ if ("file".equals(uri.getScheme())) {
+ File file = new File(uri.getPath());
+ assertFalse("file not properly deleted", file.exists());
+ }
+ else {
+ throw new IllegalArgumentException("not a local path");
+ }
+ }
+
+ private static void deleteDirectorySilently(File dir) {
+ try {
+ FileUtils.deleteDirectory(dir);
+ }
+ catch (IOException ignored) {}
+ }
+
+ private static boolean isDirectoryEmpty(File directory) {
+ String[] nested = directory.list();
+ return nested == null || nested.length == 0;
+ }
+
+ private static String localFileUri(File path) {
+ return (OperatingSystem.isWindows() ? "file:/" : "file://") + path.getAbsolutePath();
+ }
+
+ private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
+ byte[] holder = new byte[data.length];
+ assertEquals("not enough data", holder.length, is.read(holder));
+ assertEquals("too much data", -1, is.read());
+ assertArrayEquals("wrong data", data, holder);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
new file mode 100644
index 0000000..3410d09
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/MemoryStateBackendTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.types.StringValue;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the {@link org.apache.flink.streaming.api.state.memory.MemoryStateBackend}.
+ */
+public class MemoryStateBackendTest {
+
+ @Test
+ public void testSerializableState() {
+ try {
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ HashMap<String, Integer> state = new HashMap<>();
+ state.put("hey there", 2);
+ state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+
+ StateHandle<HashMap<String, Integer>> handle = backend.checkpointStateSerializable(state, 12, 459);
+ assertNotNull(handle);
+
+ HashMap<String, Integer> restored = handle.getState(getClass().getClassLoader());
+ assertEquals(state, restored);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOversizedState() {
+ try {
+ MemoryStateBackend backend = new MemoryStateBackend(10);
+
+ HashMap<String, Integer> state = new HashMap<>();
+ state.put("hey there", 2);
+ state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+
+ try {
+ backend.checkpointStateSerializable(state, 12, 459);
+ fail("this should cause an exception");
+ }
+ catch (IOException e) {
+ // now darling, isn't that exactly what we wanted?
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testStateStream() {
+ try {
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ HashMap<String, Integer> state = new HashMap<>();
+ state.put("hey there", 2);
+ state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+
+ StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2);
+ ObjectOutputStream oos = new ObjectOutputStream(os);
+ oos.writeObject(state);
+ oos.flush();
+ StreamStateHandle handle = os.closeAndGetHandle();
+
+ assertNotNull(handle);
+
+ ObjectInputStream ois = new ObjectInputStream(handle.getState(getClass().getClassLoader()));
+ assertEquals(state, ois.readObject());
+ assertTrue(ois.available() <= 0);
+ ois.close();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testOversizedStateStream() {
+ try {
+ MemoryStateBackend backend = new MemoryStateBackend(10);
+
+ HashMap<String, Integer> state = new HashMap<>();
+ state.put("hey there", 2);
+ state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77);
+
+ StateBackend.CheckpointStateOutputStream os = backend.createCheckpointStateOutputStream(1, 2);
+ ObjectOutputStream oos = new ObjectOutputStream(os);
+
+ try {
+ oos.writeObject(state);
+ oos.flush();
+ os.closeAndGetHandle();
+ fail("this should cause an exception");
+ }
+ catch (IOException e) {
+ // oh boy! what an exception!
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testKeyValueState() {
+ try {
+ MemoryStateBackend backend = new MemoryStateBackend();
+
+ KvState<Integer, String, MemoryStateBackend> kv =
+ backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
+ assertEquals(0, kv.size());
+
+ // some modifications to the state
+ kv.setCurrentKey(1);
+ assertNull(kv.value());
+ kv.update("1");
+ assertEquals(1, kv.size());
+ kv.setCurrentKey(2);
+ assertNull(kv.value());
+ kv.update("2");
+ assertEquals(2, kv.size());
+ kv.setCurrentKey(1);
+ assertEquals("1", kv.value());
+ assertEquals(2, kv.size());
+
+ // draw a snapshot
+ KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot1 =
+ kv.shapshot(682375462378L, System.currentTimeMillis());
+
+ // make some more modifications
+ kv.setCurrentKey(1);
+ kv.update("u1");
+ kv.setCurrentKey(2);
+ kv.update("u2");
+ kv.setCurrentKey(3);
+ kv.update("u3");
+
+ // draw another snapshot
+ KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot2 =
+ kv.shapshot(682375462379L, System.currentTimeMillis());
+
+ // validate the original state
+ assertEquals(3, kv.size());
+ kv.setCurrentKey(1);
+ assertEquals("u1", kv.value());
+ kv.setCurrentKey(2);
+ assertEquals("u2", kv.value());
+ kv.setCurrentKey(3);
+ assertEquals("u3", kv.value());
+
+ // restore the first snapshot and validate it
+ KvState<Integer, String, MemoryStateBackend> restored1 = snapshot1.restoreState(backend,
+ IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+ assertEquals(2, restored1.size());
+ restored1.setCurrentKey(1);
+ assertEquals("1", restored1.value());
+ restored1.setCurrentKey(2);
+ assertEquals("2", restored1.value());
+
+ // restore the first snapshot and validate it
+ KvState<Integer, String, MemoryStateBackend> restored2 = snapshot2.restoreState(backend,
+ IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+
+ assertEquals(3, restored2.size());
+ restored2.setCurrentKey(1);
+ assertEquals("u1", restored2.value());
+ restored2.setCurrentKey(2);
+ assertEquals("u2", restored2.value());
+ restored2.setCurrentKey(3);
+ assertEquals("u3", restored2.value());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRestoreWithWrongSerializers() {
+ try {
+ MemoryStateBackend backend = new MemoryStateBackend();
+ KvState<Integer, String, MemoryStateBackend> kv =
+ backend.createKvState(IntSerializer.INSTANCE, StringSerializer.INSTANCE, null);
+
+ kv.setCurrentKey(1);
+ kv.update("1");
+ kv.setCurrentKey(2);
+ kv.update("2");
+
+ KvStateSnapshot<Integer, String, MemoryStateBackend> snapshot =
+ kv.shapshot(682375462378L, System.currentTimeMillis());
+
+
+ @SuppressWarnings("unchecked")
+ TypeSerializer<Integer> fakeIntSerializer =
+ (TypeSerializer<Integer>) (TypeSerializer<?>) FloatSerializer.INSTANCE;
+
+ @SuppressWarnings("unchecked")
+ TypeSerializer<String> fakeStringSerializer =
+ (TypeSerializer<String>) (TypeSerializer<?>) new ValueSerializer<StringValue>(StringValue.class);
+
+ try {
+ snapshot.restoreState(backend, fakeIntSerializer,
+ StringSerializer.INSTANCE, null, getClass().getClassLoader());
+ fail("should recognize wrong serializers");
+ } catch (IllegalArgumentException e) {
+ // expected
+ } catch (Exception e) {
+ fail("wrong exception");
+ }
+
+ try {
+ snapshot.restoreState(backend, IntSerializer.INSTANCE,
+ fakeStringSerializer, null, getClass().getClassLoader());
+ fail("should recognize wrong serializers");
+ } catch (IllegalArgumentException e) {
+ // expected
+ } catch (Exception e) {
+ fail("wrong exception");
+ }
+
+ try {
+ snapshot.restoreState(backend, fakeIntSerializer,
+ fakeStringSerializer, null, getClass().getClassLoader());
+ fail("should recognize wrong serializers");
+ } catch (IllegalArgumentException e) {
+ // expected
+ } catch (Exception e) {
+ fail("wrong exception");
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java
deleted file mode 100644
index d6a8a54..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StateHandleTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-public class StateHandleTest {
-
- @Test
- public void operatorStateHandleTest() throws Exception {
-
- MockHandle<Serializable> h1 = new MockHandle<Serializable>(1);
-
- OperatorStateHandle opHandle = new OperatorStateHandle(h1, true);
- assertEquals(1, opHandle.getState(this.getClass().getClassLoader()));
-
- OperatorStateHandle dsHandle = serializeDeserialize(opHandle);
- MockHandle<Serializable> h2 = (MockHandle<Serializable>) dsHandle.getHandle();
- assertFalse(h2.discarded);
- assertNotNull(h1.state);
- assertNull(h2.state);
-
- dsHandle.discardState();
-
- assertTrue(h2.discarded);
- }
-
- @Test
- public void wrapperStateHandleTest() throws Exception {
- final ClassLoader cl = this.getClass().getClassLoader();
-
- MockHandle<Serializable> h1 = new MockHandle<Serializable>(1);
- MockHandle<Serializable> h2 = new MockHandle<Serializable>(2);
- StateHandle<Serializable> h3 = new MockHandle<Serializable>(3);
-
- OperatorStateHandle opH1 = new OperatorStateHandle(h1, true);
- OperatorStateHandle opH2 = new OperatorStateHandle(h2, false);
-
- Map<String, OperatorStateHandle> opHandles = ImmutableMap.of("h1", opH1, "h2", opH2);
-
- Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> fullState = Tuple2.of(h3,
- opHandles);
-
- List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = ImmutableList
- .of(fullState);
-
- WrapperStateHandle wrapperHandle = new WrapperStateHandle(chainedStates);
-
- WrapperStateHandle dsWrapper = serializeDeserialize(wrapperHandle);
-
- @SuppressWarnings("unchecked")
- Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> dsFullState = ((List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>) dsWrapper
- .getState(cl)).get(0);
-
- Map<String, OperatorStateHandle> dsOpHandles = dsFullState.f1;
-
- assertNull(dsFullState.f0.getState(cl));
- assertFalse(((MockHandle<?>) dsFullState.f0).discarded);
- assertFalse(((MockHandle<?>) dsOpHandles.get("h1").getHandle()).discarded);
- assertNull(dsOpHandles.get("h1").getState(cl));
- assertFalse(((MockHandle<?>) dsOpHandles.get("h2").getHandle()).discarded);
- assertNull(dsOpHandles.get("h2").getState(cl));
-
- dsWrapper.discardState();
-
- assertTrue(((MockHandle<?>) dsFullState.f0).discarded);
- assertTrue(((MockHandle<?>) dsOpHandles.get("h1").getHandle()).discarded);
- assertTrue(((MockHandle<?>) dsOpHandles.get("h2").getHandle()).discarded);
-
- }
-
- @SuppressWarnings("unchecked")
- private <X extends StateHandle<?>> X serializeDeserialize(X handle) throws IOException,
- ClassNotFoundException {
- byte[] serialized = InstantiationUtil.serializeObject(handle);
- return (X) InstantiationUtil.deserializeObject(serialized, Thread.currentThread()
- .getContextClassLoader());
- }
-
- @SuppressWarnings("serial")
- private static class MockHandle<T> implements StateHandle<T> {
-
- boolean discarded = false;
- transient T state;
-
- public MockHandle(T state) {
- this.state = state;
- }
-
- @Override
- public void discardState() {
- state = null;
- discarded = true;
- }
-
- @Override
- public T getState(ClassLoader userCodeClassLoader) {
- return state;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
deleted file mode 100644
index ead3af8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Test the functionality supported by stateful user functions for both
- * partitioned and non-partitioned user states. This test mimics the runtime
- * behavior of stateful stream operators.
- */
-public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase {
-
- @Test
- public void simpleStateTest() throws Exception {
-
- List<String> out = new ArrayList<String>();
-
- StreamMap<Integer, String> map = createOperatorWithContext(out, new ModKey(2), null);
- StreamingRuntimeContext context = map.getRuntimeContext();
-
- processInputs(map, Arrays.asList(1, 2, 3, 4, 5));
-
- assertEquals(Arrays.asList("1", "2", "3", "4", "5"), out);
- assertEquals((Integer) 5, context.getOperatorState("counter", 0, false).value());
- assertEquals(ImmutableMap.of(0, new MutableInt(2), 1, new MutableInt(3)), context.getOperatorStates().get("groupCounter").getPartitionedState());
- assertEquals("12345", context.getOperatorState("concat", "", false).value());
- assertEquals((Integer) 5, ((StatefulMapper) map.getUserFunction()).checkpointedCounter);
-
- byte[] serializedState0 = InstantiationUtil.serializeObject(map.getStateSnapshotFromFunction(1, 1));
- // Restore state but snapshot again before calling the value
- byte[] serializedState = InstantiationUtil.serializeObject(createOperatorWithContext(out,
- new ModKey(2), serializedState0).getStateSnapshotFromFunction(1, 1));
-
- StreamMap<Integer, String> restoredMap = createOperatorWithContext(out, new ModKey(2), serializedState);
- StreamingRuntimeContext restoredContext = restoredMap.getRuntimeContext();
-
- assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).value());
- assertEquals(ImmutableMap.of(0, new MutableInt(2), 1, new MutableInt(3)), context.getOperatorStates().get("groupCounter").getPartitionedState());
- assertEquals("12345", restoredContext.getOperatorState("concat", "", false).value());
- assertEquals((Integer) 5, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);
- out.clear();
-
- processInputs(restoredMap, Arrays.asList(7, 8));
-
- assertEquals(Arrays.asList("7", "8"), out);
- assertEquals((Integer) 7, restoredContext.getOperatorState("counter", 0, false).value());
- assertEquals(ImmutableMap.of(0, new MutableInt(3), 1, new MutableInt(4)), restoredContext.getOperatorStates().get("groupCounter")
- .getPartitionedState());
- assertEquals("1234578", restoredContext.getOperatorState("concat", "", false).value());
- assertEquals((Integer) 7, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);
-
- }
-
- @Test
- public void apiTest() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
-
- KeyedStream<Integer, Integer> keyedStream = env
- .fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6))
- .keyBy(new ModKey(4));
-
- keyedStream.map(new StatefulMapper()).addSink(new SinkFunction<String>() {
- private static final long serialVersionUID = 1L;
-
- public void invoke(String value) throws Exception {
- }
- });
-
- keyedStream.map(new StatefulMapper2()).setParallelism(1).addSink(new SinkFunction<String>() {
- private static final long serialVersionUID = 1L;
-
- public void invoke(String value) throws Exception {
- }
- });
-
- try {
- keyedStream.shuffle();
- fail();
- } catch (UnsupportedOperationException e) {
-
- }
-
- env.fromElements(0, 1, 2, 2, 2, 3, 4, 3, 4).keyBy(new KeySelector<Integer, Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value;
- }
-
- }).map(new PStateKeyRemovalTestMapper()).setParallelism(1).addSink(new SinkFunction<String>() {
- private static final long serialVersionUID = 1L;
-
- public void invoke(String value) throws Exception {
- }
- });
-
- env.execute();
- }
-
- private void processInputs(StreamMap<Integer, ?> map, List<Integer> input) throws Exception {
- for (Integer i : input) {
- map.getRuntimeContext().setNextInput(new StreamRecord<Integer>(i, 0L));
- map.processElement(new StreamRecord<Integer>(i, 0L));
- }
- }
-
- @SuppressWarnings("unchecked")
- private StreamMap<Integer, String> createOperatorWithContext(List<String> output,
- KeySelector<Integer, ? extends Serializable> partitioner, byte[] serializedState) throws Exception {
- final List<String> outputList = output;
-
- StreamingRuntimeContext context = new StreamingRuntimeContext(
- new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
- new ExecutionConfig(),
- partitioner,
- new LocalStateHandleProvider<Serializable>(),
- new HashMap<String, Accumulator<?, ?>>(),
- null);
-
- StreamMap<Integer, String> op = new StreamMap<Integer, String>(new StatefulMapper());
-
- op.setup(new Output<StreamRecord<String>>() {
-
- @Override
- public void collect(StreamRecord<String> record) {
- outputList.add(record.getValue());
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
-
- }
-
- @Override
- public void close() {
- }
- }, context);
-
- if (serializedState != null) {
- ClassLoader cl = Thread.currentThread().getContextClassLoader();
- op.restoreInitialState((Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>) InstantiationUtil
- .deserializeObject(serializedState, cl));
- }
-
- op.open(null);
-
- return op;
- }
-
- public static class StatefulMapper extends RichMapFunction<Integer, String> implements
- Checkpointed<Integer> {
- private static final long serialVersionUID = -9007873655253339356L;
- OperatorState<Integer> counter;
- OperatorState<MutableInt> groupCounter;
- OperatorState<String> concat;
-
- Integer checkpointedCounter = 0;
-
- @Override
- public String map(Integer value) throws Exception {
- counter.update(counter.value() + 1);
- MutableInt incremented = groupCounter.value();
- incremented.increment();
- groupCounter.update(incremented);
- concat.update(concat.value() + value.toString());
- checkpointedCounter++;
- try {
- counter.update(null);
- fail();
- } catch (RuntimeException e) {
- }
- return value.toString();
- }
-
- @Override
- public void open(Configuration conf) throws IOException {
- counter = getRuntimeContext().getOperatorState("counter", 0, false, intCheckpointer);
- groupCounter = getRuntimeContext().getOperatorState("groupCounter", new MutableInt(0), true);
- concat = getRuntimeContext().getOperatorState("concat", "", false);
- try {
- getRuntimeContext().getOperatorState("test", null, true);
- fail();
- } catch (RuntimeException e) {
- }
- try {
- getRuntimeContext().getOperatorState("test", null, true, null);
- fail();
- } catch (RuntimeException e) {
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void close() throws Exception {
- Map<String, StreamOperatorState<?, ?>> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
- PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter = (PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter");
- for (Entry<Serializable, Integer> count : groupCounter.getPartitionedState().entrySet()) {
- Integer key = (Integer) count.getKey();
- Integer expected = key < 3 ? 2 : 1;
-
- assertEquals(new MutableInt(expected), count.getValue());
- }
- }
-
- @Override
- public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- return checkpointedCounter;
- }
-
- @Override
- public void restoreState(Integer state) {
- this.checkpointedCounter = (Integer) state;
- }
- }
-
- public static class StatefulMapper2 extends RichMapFunction<Integer, String> {
- private static final long serialVersionUID = 1L;
- OperatorState<Integer> groupCounter;
-
- @Override
- public String map(Integer value) throws Exception {
- groupCounter.update(groupCounter.value() + 1);
-
- return value.toString();
- }
-
- @Override
- public void open(Configuration conf) throws IOException {
- groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true, intCheckpointer);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void close() throws Exception {
- Map<String, StreamOperatorState<?, ?>> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
- PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter =
- (PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter");
- for (Entry<Serializable, Integer> count : groupCounter.getPartitionedState().entrySet()) {
- Integer key = (Integer) count.getKey();
- Integer expected = key < 3 ? 2 : 1;
- assertEquals(expected, count.getValue());
- }
- }
-
- }
-
- public static StateCheckpointer<Integer, String> intCheckpointer = new StateCheckpointer<Integer, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String snapshotState(Integer state, long checkpointId, long checkpointTimestamp) {
- return state.toString();
- }
-
- @Override
- public Integer restoreState(String stateSnapshot) {
- return Integer.parseInt(stateSnapshot);
- }
- };
-
- public static class PStateKeyRemovalTestMapper extends RichMapFunction<Integer, String> {
-
- private static final long serialVersionUID = 1L;
- OperatorState<Boolean> seen;
-
- @Override
- public String map(Integer value) throws Exception {
- if (value == 0) {
- seen.update(null);
- }else{
- Boolean s = seen.value();
- if (s) {
- seen.update(null);
- } else {
- seen.update(true);
- }
- }
-
- return value.toString();
- }
-
- public void open(Configuration c) throws IOException {
- seen = getRuntimeContext().getOperatorState("seen", false, true);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void close() throws Exception {
- Map<String, StreamOperatorState<?, ?>> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
- PartitionedStreamOperatorState<Integer, Boolean, Boolean> seen = (PartitionedStreamOperatorState<Integer, Boolean, Boolean>) states.get("seen");
- assertFalse(seen.getPartitionedState().containsKey(0));
- assertEquals(2,seen.getPartitionedState().size());
- for (Entry<Serializable, Boolean> s : seen.getPartitionedState().entrySet()) {
- assertTrue(s.getValue());
- }
- }
-
- }
-
- public static class ModKey implements KeySelector<Integer, Integer> {
-
- private static final long serialVersionUID = 4193026742083046736L;
-
- int base;
-
- public ModKey(int base) {
- this.base = base;
- }
-
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value % base;
- }
-
- }
-
-}